From acb25b35102dfca08f66e155560f6c99cb8fa841 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 23 Jun 2004 23:17:30 +0000 Subject: main part of the native protocol git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@31 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/protocol-native.c | 228 +++++++++++++++++++++++++++----------------------- 1 file changed, 124 insertions(+), 104 deletions(-) (limited to 'src/protocol-native.c') diff --git a/src/protocol-native.c b/src/protocol-native.c index e9cca7c1..a39880b8 100644 --- a/src/protocol-native.c +++ b/src/protocol-native.c @@ -3,34 +3,19 @@ #include #include "protocol-native.h" +#include "protocol-native-spec.h" #include "packet.h" #include "client.h" #include "sourceoutput.h" #include "sinkinput.h" #include "pstream.h" #include "tagstruct.h" +#include "pdispatch.h" +#include "pstream-util.h" struct connection; struct protocol_native; -enum { - COMMAND_ERROR, - COMMAND_REPLY, - COMMAND_CREATE_PLAYBACK_STREAM, - COMMAND_DELETE_PLAYBACK_STREAM, - COMMAND_CREATE_RECORD_STREAM, - COMMAND_DELETE_RECORD_STREAM, - COMMAND_EXIT, - COMMAND_MAX -}; - -enum { - ERROR_ACCESS, - ERROR_COMMAND, - ERROR_ARGUMENT, - ERROR_EXIST -}; - struct record_stream { struct connection *connection; uint32_t index; @@ -41,6 +26,7 @@ struct record_stream { struct playback_stream { struct connection *connection; uint32_t index; + size_t qlength; struct sink_input *sink_input; struct memblockq *memblockq; }; @@ -50,6 +36,7 @@ struct connection { struct protocol_native *protocol; struct client *client; struct pstream *pstream; + struct pdispatch *pdispatch; struct idxset *record_streams, *playback_streams; }; @@ -60,6 +47,29 @@ struct protocol_native { struct idxset *connections; }; +static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk); +static void sink_input_drop_cb(struct sink_input *i, size_t length); +static void sink_input_kill_cb(struct sink_input *i); +static uint32_t sink_input_get_latency_cb(struct sink_input *i); + +static void request_bytes(struct playback_stream*s); + +static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); +static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata); + +static const struct pdispatch_command command_table[PA_COMMAND_MAX] = { + [PA_COMMAND_ERROR] = { NULL }, + [PA_COMMAND_REPLY] = { NULL }, + [PA_COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream }, + [PA_COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream }, + [PA_COMMAND_CREATE_RECORD_STREAM] = { NULL }, + [PA_COMMAND_DELETE_RECORD_STREAM] = { NULL }, + [PA_COMMAND_EXIT] = { command_exit }, +}; + +/* structure management */ + static void record_stream_free(struct record_stream* r) { assert(r && r->connection); @@ -69,18 +79,28 @@ static void record_stream_free(struct record_stream* r) { free(r); } -static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct sample_spec *ss, const char *name, size_t maxlength, size_t prebuf) { +static struct playback_stream* playback_stream_new(struct connection *c, struct sink *sink, struct pa_sample_spec *ss, const char *name, size_t qlen, size_t maxlength, size_t prebuf) { struct playback_stream *s; + assert(c && sink && s && name && qlen && maxlength && prebuf); s = malloc(sizeof(struct playback_stream)); assert (s); s->connection = c; + s->qlength = qlen; + s->sink_input = sink_input_new(sink, ss, name); assert(s->sink_input); - s->memblockq = memblockq_new(maxlength, sample_size(ss), prebuf); + s->sink_input->peek = sink_input_peek_cb; + s->sink_input->drop = sink_input_drop_cb; + s->sink_input->kill = sink_input_kill_cb; + s->sink_input->get_latency = sink_input_get_latency_cb; + s->sink_input->userdata = s; + + s->memblockq = memblockq_new(maxlength, pa_sample_size(ss), prebuf); assert(s->memblockq); idxset_put(c->playback_streams, s, &s->index); + request_bytes(s); return s; } @@ -99,7 +119,6 @@ static void connection_free(struct connection *c) { assert(c && c->protocol); idxset_remove_by_data(c->protocol->connections, c, NULL); - pstream_free(c->pstream); while ((r = idxset_first(c->record_streams, NULL))) record_stream_free(r); idxset_free(c->record_streams, NULL, NULL); @@ -108,67 +127,90 @@ static void connection_free(struct connection *c) { playback_stream_free(p); idxset_free(c->playback_streams, NULL, NULL); + pdispatch_free(c->pdispatch); + pstream_free(c->pstream); client_free(c->client); free(c); } -/*** pstream callbacks ***/ +static void request_bytes(struct playback_stream *s) { + struct tagstruct *t; + size_t l; + assert(s); -static void send_tagstruct(struct pstream *p, struct tagstruct *t) { - size_t length; - uint8_t *data; - struct packet *packet; - assert(p && t); - - data = tagstruct_free_data(t, &length); - assert(data && length); - packet = packet_new_dynamic(data, length); - assert(packet); - pstream_send_packet(p, packet); - packet_unref(packet); -} + if (!(l = memblockq_missing_to(s->memblockq, s->qlength))) + return; -static void send_error(struct pstream *p, uint32_t tag, uint32_t error) { - struct tagstruct *t = tagstruct_new(NULL, 0); + t = tagstruct_new(NULL, 0); assert(t); - tagstruct_putu32(t, COMMAND_ERROR); - tagstruct_putu32(t, tag); - tagstruct_putu32(t, error); - send_tagstruct(p, t); + tagstruct_putu32(t, PA_COMMAND_REQUEST); + tagstruct_putu32(t, s->index); + tagstruct_putu32(t, l); + pstream_send_tagstruct(s->connection->pstream, t); } -static void send_simple_ack(struct pstream *p, uint32_t tag) { - struct tagstruct *t = tagstruct_new(NULL, 0); - assert(t); - tagstruct_putu32(t, COMMAND_REPLY); - tagstruct_putu32(t, tag); - send_tagstruct(p, t); +/*** sinkinput callbacks ***/ + +static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk) { + struct playback_stream *s; + assert(i && i->userdata && chunk); + s = i->userdata; + + if (memblockq_peek(s->memblockq, chunk) < 0) + return -1; + + return 0; } -struct command { - int (*func)(struct connection *c, uint32_t tag, struct tagstruct *t); -}; +static void sink_input_drop_cb(struct sink_input *i, size_t length) { + struct playback_stream *s; + assert(i && i->userdata && length); + s = i->userdata; -static int command_create_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) { + memblockq_drop(s->memblockq, length); + request_bytes(s); +} + +static void sink_input_kill_cb(struct sink_input *i) { struct playback_stream *s; - size_t maxlength, prebuf; + assert(i && i->userdata); + s = i->userdata; + + playback_stream_free(s); +} + +static uint32_t sink_input_get_latency_cb(struct sink_input *i) { + struct playback_stream *s; + assert(i && i->userdata); + s = i->userdata; + + return pa_samples_usec(memblockq_get_length(s->memblockq), &s->sink_input->sample_spec); +} + +/*** pdispatch callbacks ***/ + +static int command_create_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + struct connection *c = userdata; + struct playback_stream *s; + size_t maxlength, prebuf, qlength; uint32_t sink_index; const char *name; - struct sample_spec ss; + struct pa_sample_spec ss; struct tagstruct *reply; struct sink *sink; assert(c && t && c->protocol && c->protocol->core); if (tagstruct_gets(t, &name) < 0 || tagstruct_get_sample_spec(t, &ss) < 0 || - tagstruct_getu32(t, &sink_index) < 0 || + tagstruct_getu32(t, &sink_index) < 0 || + tagstruct_getu32(t, &qlength) < 0 || tagstruct_getu32(t, &maxlength) < 0 || tagstruct_getu32(t, &prebuf) < 0 || !tagstruct_eof(t)) return -1; if (!c->authorized) { - send_error(c->pstream, tag, ERROR_ACCESS); + pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); return 0; } @@ -178,25 +220,28 @@ static int command_create_playback_stream(struct connection *c, uint32_t tag, st sink = idxset_get_by_index(c->protocol->core->sinks, sink_index); if (!sink) { - send_error(c->pstream, tag, ERROR_EXIST); + pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); return 0; } - if (!(s = playback_stream_new(c, sink, &ss, name, maxlength, prebuf))) { - send_error(c->pstream, tag, ERROR_ARGUMENT); + if (!(s = playback_stream_new(c, sink, &ss, name, qlength, maxlength, prebuf))) { + pstream_send_error(c->pstream, tag, PA_ERROR_INVALID); return 0; } reply = tagstruct_new(NULL, 0); assert(reply); - tagstruct_putu32(reply, COMMAND_REPLY); + tagstruct_putu32(reply, PA_COMMAND_REPLY); tagstruct_putu32(reply, tag); tagstruct_putu32(reply, s->index); - send_tagstruct(c->pstream, reply); + assert(s->sink_input); + tagstruct_putu32(reply, s->sink_input->index); + pstream_send_tagstruct(c->pstream, reply); return 0; } -static int command_delete_playback_stream(struct connection *c, uint32_t tag, struct tagstruct *t) { +static int command_delete_playback_stream(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + struct connection *c = userdata; uint32_t channel; struct playback_stream *s; assert(c && t); @@ -206,78 +251,50 @@ static int command_delete_playback_stream(struct connection *c, uint32_t tag, st return -1; if (!c->authorized) { - send_error(c->pstream, tag, ERROR_ACCESS); + pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); return 0; } if (!(s = idxset_get_by_index(c->playback_streams, channel))) { - send_error(c->pstream, tag, ERROR_EXIST); + pstream_send_error(c->pstream, tag, PA_ERROR_EXIST); return 0; } - send_simple_ack(c->pstream, tag); + pstream_send_simple_ack(c->pstream, tag); return 0; } -static int command_exit(struct connection *c, uint32_t tag, struct tagstruct *t) { +static int command_exit(struct pdispatch *pd, uint32_t command, uint32_t tag, struct tagstruct *t, void *userdata) { + struct connection *c = userdata; assert(c && t); if (!tagstruct_eof(t)) return -1; if (!c->authorized) { - send_error(c->pstream, tag, ERROR_ACCESS); + pstream_send_error(c->pstream, tag, PA_ERROR_ACCESS); return 0; } - assert(c->protocol && c->protocol->core); - mainloop_quit(c->protocol->core->mainloop, -1); - send_simple_ack(c->pstream, tag); /* nonsense */ + assert(c->protocol && c->protocol->core && c->protocol->core->mainloop); + c->protocol->core->mainloop->quit(c->protocol->core->mainloop, 0); + pstream_send_simple_ack(c->pstream, tag); /* nonsense */ return 0; } -static const struct command commands[] = { - [COMMAND_ERROR] = { NULL }, - [COMMAND_REPLY] = { NULL }, - [COMMAND_CREATE_PLAYBACK_STREAM] = { command_create_playback_stream }, - [COMMAND_DELETE_PLAYBACK_STREAM] = { command_delete_playback_stream }, - [COMMAND_CREATE_RECORD_STREAM] = { NULL }, - [COMMAND_DELETE_RECORD_STREAM] = { NULL }, - [COMMAND_EXIT] = { command_exit }, -}; +/*** pstream callbacks ***/ + static int packet_callback(struct pstream *p, struct packet *packet, void *userdata) { struct connection *c = userdata; - uint32_t tag, command; - struct tagstruct *ts = NULL; assert(p && packet && packet->data && c); - if (packet->length <= 8) - goto fail; - - ts = tagstruct_new(packet->data, packet->length); - assert(ts); - - if (tagstruct_getu32(ts, &command) < 0 || - tagstruct_getu32(ts, &tag) < 0) - goto fail; - - if (command >= COMMAND_MAX || !commands[command].func) - send_error(p, tag, ERROR_COMMAND); - else if (commands[command].func(c, tag, ts) < 0) - goto fail; + if (pdispatch_run(c->pdispatch, packet, c) < 0) { + fprintf(stderr, "protocol-native: invalid packet.\n"); + return -1; + } - tagstruct_free(ts); - return 0; - -fail: - if (ts) - tagstruct_free(ts); - - fprintf(stderr, "protocol-native: invalid packet.\n"); - return -1; - } static int memblock_callback(struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata) { @@ -326,6 +343,9 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us pstream_set_recieve_memblock_callback(c->pstream, memblock_callback, c); pstream_set_die_callback(c->pstream, die_callback, c); + c->pdispatch = pdispatch_new(p->core->mainloop, command_table, PA_COMMAND_MAX); + assert(c->pdispatch); + c->record_streams = idxset_new(NULL, NULL); c->playback_streams = idxset_new(NULL, NULL); assert(c->record_streams && c->playback_streams); -- cgit