diff options
Diffstat (limited to 'src/pulsecore/protocol-simple.c')
-rw-r--r-- | src/pulsecore/protocol-simple.c | 306 |
1 files changed, 218 insertions, 88 deletions
diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 31ad6ddd..288cf87a 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -25,7 +25,6 @@ #include <config.h> #endif -#include <assert.h> #include <stdlib.h> #include <limits.h> #include <stdio.h> @@ -54,13 +53,13 @@ struct connection { pa_source_output *source_output; pa_client *client; pa_memblockq *input_memblockq, *output_memblockq; - pa_defer_event *defer_event; int dead; struct { pa_memblock *current_memblock; size_t memblock_index, fragment_size; + pa_atomic_int missing; } playback; }; @@ -69,35 +68,52 @@ struct pa_protocol_simple { pa_core *core; pa_socket_server*server; pa_idxset *connections; + + pa_asyncmsgq *asyncmsgq; + enum { RECORD = 1, PLAYBACK = 2, DUPLEX = 3 } mode; + pa_sample_spec sample_spec; char *source_name, *sink_name; }; +enum { + SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */ +}; + +enum { + MESSAGE_REQUEST_DATA, /* data from source output to main loop */ + MESSAGE_POST_DATA /* data from source output to main loop */ +}; + + #define PLAYBACK_BUFFER_SECONDS (.5) #define PLAYBACK_BUFFER_FRAGMENTS (10) #define RECORD_BUFFER_SECONDS (5) #define RECORD_BUFFER_FRAGMENTS (100) static void connection_free(struct connection *c) { - assert(c); + pa_assert(c); pa_idxset_remove_by_data(c->protocol->connections, c, NULL); - if (c->playback.current_memblock) - pa_memblock_unref(c->playback.current_memblock); if (c->sink_input) { pa_sink_input_disconnect(c->sink_input); pa_sink_input_unref(c->sink_input); } + if (c->source_output) { pa_source_output_disconnect(c->source_output); pa_source_output_unref(c->source_output); } + + if (c->playback.current_memblock) + pa_memblock_unref(c->playback.current_memblock); + if (c->client) pa_client_free(c->client); if (c->io) @@ -106,8 +122,7 @@ static void connection_free(struct connection *c) { pa_memblockq_free(c->input_memblockq); if (c->output_memblockq) pa_memblockq_free(c->output_memblockq); - if (c->defer_event) - c->protocol->core->mainloop->defer_free(c->defer_event); + pa_xfree(c); } @@ -115,27 +130,37 @@ static int do_read(struct connection *c) { pa_memchunk chunk; ssize_t r; size_t l; + void *p; - if (!c->sink_input || !(l = pa_memblockq_missing(c->input_memblockq))) + pa_assert(c); + + if (!c->sink_input || !(l = pa_atomic_load(&c->playback.missing))) return 0; if (l > c->playback.fragment_size) l = c->playback.fragment_size; if (c->playback.current_memblock) - if (c->playback.current_memblock->length - c->playback.memblock_index < l) { + if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) { pa_memblock_unref(c->playback.current_memblock); c->playback.current_memblock = NULL; c->playback.memblock_index = 0; } if (!c->playback.current_memblock) { - c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); - assert(c->playback.current_memblock && c->playback.current_memblock->length >= l); + pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, l)); c->playback.memblock_index = 0; } - if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { + p = pa_memblock_acquire(c->playback.current_memblock); + r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l); + pa_memblock_release(c->playback.current_memblock); + + if (r <= 0) { + + if (errno == EINTR || errno == EAGAIN) + return 0; + pa_log_debug("read(): %s", r == 0 ? "EOF" : pa_cstrerror(errno)); return -1; } @@ -143,14 +168,10 @@ static int do_read(struct connection *c) { chunk.memblock = c->playback.current_memblock; chunk.index = c->playback.memblock_index; chunk.length = r; - assert(chunk.memblock); c->playback.memblock_index += r; - assert(c->input_memblockq); - pa_memblockq_push_align(c->input_memblockq, &chunk); - assert(c->sink_input); - pa_sink_notify(c->sink_input->sink); + pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_POST_DATA, NULL, &chunk, NULL, NULL); return 0; } @@ -158,35 +179,41 @@ static int do_read(struct connection *c) { static int do_write(struct connection *c) { pa_memchunk chunk; ssize_t r; + void *p; + + p_assert(c); if (!c->source_output) return 0; - assert(c->output_memblockq); if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0) return 0; - assert(chunk.memblock && chunk.length); + pa_assert(chunk.memblock); + pa_assert(chunk.length); + + p = pa_memblock_acquire(chunk.memblock); + r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length); + pa_memblock_release(chunk.memblock); + + pa_memblock_unref(chunk.memblock); + + if (r < 0) { + + if (errno == EINTR || errno == EAGAIN) + return 0; - if ((r = pa_iochannel_write(c->io, (uint8_t*) chunk.memblock->data+chunk.index, chunk.length)) < 0) { - pa_memblock_unref(chunk.memblock); pa_log("write(): %s", pa_cstrerror(errno)); return -1; } pa_memblockq_drop(c->output_memblockq, &chunk, r); - pa_memblock_unref(chunk.memblock); - - pa_source_notify(c->source_output->source); - + return 0; } static void do_work(struct connection *c) { - assert(c); - - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 0); + pa_assert(c); if (c->dead) return; @@ -207,103 +234,148 @@ static void do_work(struct connection *c) { fail: if (c->sink_input) { + + /* If there is a sink input, we first drain what we already have read before shutting down the connection */ c->dead = 1; pa_iochannel_free(c->io); c->io = NULL; pa_memblockq_prebuf_disable(c->input_memblockq); - pa_sink_notify(c->sink_input->sink); } else connection_free(c); } /*** sink_input callbacks ***/ -static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { +/* Called from thread context */ +static int sink_input_process_msg(pa_sink_input *i, int code, void *userdata, const pa_memchunk *chunk) { struct connection*c; - assert(i && i->userdata && chunk); + + pa_assert(i); c = i->userdata; + pa_assert(c); - if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + switch (code) { - if (c->dead) - connection_free(c); + case SINK_INPUT_MESSAGE_POST_DATA: { + pa_assert(chunk); - return -1; + /* New data from the main loop */ + pa_memblockq_push_align(c->input_memblockq, chunk); + return 0; + } + + case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { + pa_usec_t *r = userdata; + + *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); + + /* Fall through, the default handler will add in the extra + * latency added by the resampler */ + } + + default: + return pa_sink_input_process_msg(i, code, userdata); } +} - return 0; +/* Called from thread context */ +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { + struct connection*c; + + pa_assert(i); + c = i->userdata; + pa_assert(c); + pa_assert(chunk); + + r = pa_memblockq_peek(c->input_memblockq, chunk); + + if (c->dead && r < 0) + connection_free(c); + + return r; } +/* Called from thread context */ static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { struct connection*c = i->userdata; - assert(i && c && length); + size_t old, new; + + pa_assert(i); + pa_assert(c); + pa_assert(length); + old = pa_memblockq_missing(c->input_memblockq); pa_memblockq_drop(c->input_memblockq, chunk, length); + new = pa_memblockq_missing(c->input_memblockq); - /* do something */ - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + pa_atomic_store(&c->playback.missing, &new); + + if (new > old) + pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, NULL, NULL, NULL); } +/* Called from main context */ static void sink_input_kill_cb(pa_sink_input *i) { - assert(i && i->userdata); + pa_assert(i); + pa_assert(i->userdata); + connection_free((struct connection *) i->userdata); } - -static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) { - struct connection*c = i->userdata; - assert(i && c); - return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); -} - /*** source_output callbacks ***/ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { - struct connection *c = o->userdata; - assert(o && c && chunk); - - pa_memblockq_push(c->output_memblockq, chunk); + struct connection *c; + + pa_assert(o); + c = o->userdata; + pa_assert(c); + pa_assert(chunk); - /* do something */ - assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + pa_asyncmsgq_post(c->protocol->asyncmsgq, c, MESSAGE_REQUEST_DATA, NULL, chunk, NULL, NULL); } static void source_output_kill_cb(pa_source_output *o) { - assert(o && o->userdata); - connection_free((struct connection *) o->userdata); + struct connection*c; + + pa_assert(o); + c = o->userdata; + pa_assert(c); + + connection_free(c); } static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { - struct connection*c = o->userdata; - assert(o && c); + struct connection*c; + + pa_assert(o); + c = o->userdata; + pa_assert(c); + return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec); } /*** client callbacks ***/ -static void client_kill_cb(pa_client *c) { - assert(c && c->userdata); - connection_free((struct connection *) c->userdata); +static void client_kill_cb(pa_client *client) { + struct connection*c; + + pa_assert(client); + c = client->userdata; + pa_assert(c); + + connection_free(client); } /*** pa_iochannel callbacks ***/ static void io_callback(pa_iochannel*io, void *userdata) { struct connection *c = userdata; - assert(io && c && c->io == io); - - do_work(c); -} -/*** fixed callback ***/ - -static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) { - struct connection *c = userdata; - assert(a && c && c->defer_event == e); + pa_assert(io); + pa_assert(c); do_work(c); } @@ -314,7 +386,10 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_protocol_simple *p = userdata; struct connection *c = NULL; char cname[256]; - assert(s && io && p); + + pa_assert(s); + pa_assert(io); + pa_assert(p); if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) { pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS); @@ -322,25 +397,25 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) return; } - c = pa_xmalloc(sizeof(struct connection)); + c = pa_xnew(struct connection, 1); c->io = io; c->sink_input = NULL; c->source_output = NULL; - c->defer_event = NULL; c->input_memblockq = c->output_memblockq = NULL; c->protocol = p; c->playback.current_memblock = NULL; c->playback.memblock_index = 0; c->playback.fragment_size = 0; c->dead = 0; + pa_atomic_store(&c->playback.missing, 0); pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); - c->client = pa_client_new(p->core, __FILE__, cname); - assert(c->client); + pa_assert_se(c->client = pa_client_new(p->core, __FILE__, cname)); c->client->owner = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; + if (p->mode & PLAYBACK) { pa_sink_input_new_data data; size_t l; @@ -372,11 +447,13 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, NULL); - assert(c->input_memblockq); + pa_assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); c->playback.fragment_size = l/10; - pa_sink_notify(c->sink_input->sink); + pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); + + pa_sink_input_put(c->sink_input); } if (p->mode & RECORD) { @@ -409,16 +486,14 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) 0, NULL); pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); - pa_source_notify(c->source_output->source); + + pa_source_output_put(c->source_output); } + pa_iochannel_set_callback(c->io, io_callback, c); pa_idxset_put(p->connections, c, NULL); - - c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c); - assert(c->defer_event); - p->core->mainloop->defer_enable(c->defer_event, 0); - + return; fail: @@ -426,16 +501,60 @@ fail: connection_free(c); } +static void asyncmsgq_cb(pa_mainloop_api*api, pa_io_event* e, int fd, pa_io_event_flags_t events, void *userdata) { + pa_protocol_simple *p = userdata; + int do_some_work = 0; + + pa_assert(pa_asyncmsgq_get_fd(p->asyncmsgq) == fd); + pa_assert(events == PA_IO_EVENT_INPUT); + + pa_asyncmsgq_after_poll(p->asyncmsgq); + + for (;;) { + int code; + void *object, *data; + + /* Check whether there is a message for us to process */ + while (pa_asyncmsgq_get(p->asyncmsgq, &object, &code, &data) == 0) { + + connection *c = object; + + pa_assert(c); + + switch (code) { + + case MESSAGE_REQUEST_DATA: + do_work(c); + break; + + case MESSAGE_POST_DATA: + pa_memblockq_push(c->output_memblockq, chunk); + do_work(c); + break; + } + + pa_asyncmsgq_done(p->asyncmsgq); + } + + if (pa_asyncmsgq_before_poll(p->asyncmsgq) == 0) + break; + } +} + pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { pa_protocol_simple* p = NULL; int enable; - assert(core && server && ma); + + pa_assert(core); + pa_assert(server); + pa_assert(ma); - p = pa_xmalloc0(sizeof(pa_protocol_simple)); + p = pa_xnew0(pa_protocol_simple, 1); p->module = m; p->core = core; p->server = server; p->connections = pa_idxset_new(NULL, NULL); + pa_assert_se(p->asyncmsgq = pa_asyncmsgq_new(0)); p->sample_spec = core->default_sample_spec; if (pa_modargs_get_sample_spec(ma, &p->sample_spec) < 0) { @@ -467,18 +586,22 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv pa_socket_server_set_callback(p->server, on_connection, p); + pa_assert_se(pa_asyncmsgq_before_poll(p->asyncmsgq) == 0); + pa_assert_se(p->asyncmsgq_event = core->mainloop->io_event_new(core->mainloop, pa_asyncmsgq_get_fd(p->asyncmsgq), PA_IO_EVENT_INPUT, p)); + return p; fail: if (p) pa_protocol_simple_free(p); + return NULL; } void pa_protocol_simple_free(pa_protocol_simple *p) { struct connection *c; - assert(p); + pa_assert(p); if (p->connections) { while((c = pa_idxset_first(p->connections, NULL))) @@ -489,6 +612,13 @@ void pa_protocol_simple_free(pa_protocol_simple *p) { if (p->server) pa_socket_server_unref(p->server); + + if (p->asyncmsgq) { + c->mainloop->io_event_free(c->asyncmsgq_event); + pa_asyncmsgq_after_poll(c->asyncmsgq); + pa_asyncmsgq_free(p->asyncmsgq); + } + pa_xfree(p); } |