diff options
Diffstat (limited to 'src/pulsecore/protocol-simple.c')
-rw-r--r-- | src/pulsecore/protocol-simple.c | 114 |
1 files changed, 58 insertions, 56 deletions
diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 8f9aed58..0ded5d26 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -67,7 +67,7 @@ typedef struct connection { PA_DECLARE_CLASS(connection); #define CONNECTION(o) (connection_cast(o)) -static PA_DEFINE_CHECK_TYPE(connection, connection_check_type, pa_msgobject_check_type); +static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject); struct pa_protocol_simple { pa_module *module; @@ -91,9 +91,9 @@ enum { }; enum { - MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */ - MESSAGE_POST_DATA, /* data from source output to main loop */ - MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */ + CONNECTION_MESSAGE_REQUEST_DATA, /* data requested from sink input from the main loop */ + CONNECTION_MESSAGE_POST_DATA, /* data from source output to main loop */ + CONNECTION_MESSAGE_DROP_CONNECTION /* Please drop a aconnection now */ }; @@ -102,29 +102,12 @@ enum { #define RECORD_BUFFER_SECONDS (5) #define RECORD_BUFFER_FRAGMENTS (100) -static void connection_free(pa_object *o) { - connection *c = CONNECTION(o); +static void connection_unlink(connection *c) { pa_assert(c); - if (c->playback.current_memblock) - pa_memblock_unref(c->playback.current_memblock); - - if (c->io) - pa_iochannel_free(c->io); - if (c->input_memblockq) - pa_memblockq_free(c->input_memblockq); - if (c->output_memblockq) - pa_memblockq_free(c->output_memblockq); - - pa_xfree(c); -} - -static void connection_drop(connection *c) { - pa_assert(c); - - if (!pa_idxset_remove_by_data(c->protocol->connections, c, NULL)) + if (!c->protocol) return; - + if (c->sink_input) { pa_sink_input_disconnect(c->sink_input); pa_sink_input_unref(c->sink_input); @@ -142,9 +125,30 @@ static void connection_drop(connection *c) { c->client = NULL; } + pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c); + c->protocol = NULL; connection_unref(c); } +static void connection_free(pa_object *o) { + connection *c = CONNECTION(o); + pa_assert(c); + + connection_unref(c); + + if (c->playback.current_memblock) + pa_memblock_unref(c->playback.current_memblock); + + if (c->io) + pa_iochannel_free(c->io); + if (c->input_memblockq) + pa_memblockq_free(c->input_memblockq); + if (c->output_memblockq) + pa_memblockq_free(c->output_memblockq); + + pa_xfree(c); +} + static int do_read(connection *c) { pa_memchunk chunk; ssize_t r; @@ -190,7 +194,7 @@ static int do_read(connection *c) { c->playback.memblock_index += r; - pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, &chunk, NULL); + pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL); pa_atomic_sub(&c->playback.missing, r); return 0; @@ -263,28 +267,28 @@ fail: pa_iochannel_free(c->io); c->io = NULL; - pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, NULL, NULL); + pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL); } else - connection_drop(c); + connection_unlink(c); } -static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_memchunk *chunk) { +static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) { connection *c = CONNECTION(o); connection_assert_ref(c); switch (code) { - case MESSAGE_REQUEST_DATA: + case CONNECTION_MESSAGE_REQUEST_DATA: do_work(c); break; - case MESSAGE_POST_DATA: + case CONNECTION_MESSAGE_POST_DATA: /* pa_log("got data %u", chunk->length); */ pa_memblockq_push_align(c->output_memblockq, chunk); do_work(c); break; - case MESSAGE_DROP_CONNECTION: - connection_drop(c); + case CONNECTION_MESSAGE_DROP_CONNECTION: + connection_unlink(c); break; } @@ -294,13 +298,13 @@ static int connection_process_msg(pa_msgobject *o, int code, void*userdata, pa_m /*** sink_input callbacks ***/ /* Called from thread context */ -static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_memchunk *chunk) { +static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { pa_sink_input *i = PA_SINK_INPUT(o); connection*c; - pa_assert(i); - c = i->userdata; - pa_assert(c); + pa_sink_input_assert_ref(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); switch (code) { @@ -330,7 +334,7 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, pa_ } default: - return pa_sink_input_process_msg(o, code, userdata, chunk); + return pa_sink_input_process_msg(o, code, userdata, offset, chunk); } } @@ -349,7 +353,7 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { /* pa_log("peeked %u %i", r >= 0 ? chunk->length: 0, r); */ if (c->dead && r < 0) - pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_DROP_CONNECTION, NULL, NULL, NULL); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_DROP_CONNECTION, NULL, 0, NULL, NULL); return r; } @@ -369,19 +373,20 @@ static void sink_input_drop_cb(pa_sink_input *i, size_t length) { if (new > old) { if (pa_atomic_add(&c->playback.missing, new - old) <= 0) - pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_REQUEST_DATA, NULL, NULL, NULL); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); } } /* Called from main context */ static void sink_input_kill_cb(pa_sink_input *i) { - pa_assert(i); + pa_sink_input_assert_ref(i); - connection_drop(CONNECTION(i->userdata)); + connection_unlink(CONNECTION(i->userdata)); } /*** source_output callbacks ***/ +/* Called from thread context */ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { connection *c; @@ -390,24 +395,22 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) pa_assert(c); pa_assert(chunk); - pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), MESSAGE_POST_DATA, NULL, chunk, NULL); + pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL); } +/* Called from main context */ static void source_output_kill_cb(pa_source_output *o) { - connection*c; + pa_source_output_assert_ref(o); - pa_assert(o); - c = o->userdata; - pa_assert(c); - - connection_drop(c); + connection_unlink(CONNECTION(o->userdata)); } +/* Called from main context */ static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { connection*c; pa_assert(o); - c = o->userdata; + c = CONNECTION(o->userdata); pa_assert(c); return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec); @@ -419,16 +422,16 @@ static void client_kill_cb(pa_client *client) { connection*c; pa_assert(client); - c = client->userdata; + c = CONNECTION(client->userdata); pa_assert(c); - connection_drop(c); + connection_unlink(c); } /*** pa_iochannel callbacks ***/ static void io_callback(pa_iochannel*io, void *userdata) { - connection *c = userdata; + connection *c = CONNECTION(userdata); pa_assert(io); pa_assert(c); @@ -453,7 +456,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) return; } - c = pa_msgobject_new(connection, connection_check_type); + c = pa_msgobject_new(connection); c->parent.parent.free = connection_free; c->parent.process_msg = connection_process_msg; c->io = io; @@ -547,7 +550,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) pa_source_output_put(c->source_output); } - pa_iochannel_set_callback(c->io, io_callback, c); pa_idxset_put(p->connections, c, NULL); @@ -555,7 +557,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) fail: if (c) - connection_drop(c); + connection_unlink(c); } pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { @@ -618,7 +620,7 @@ void pa_protocol_simple_free(pa_protocol_simple *p) { if (p->connections) { while((c = pa_idxset_first(p->connections, NULL))) - connection_drop(c); + connection_unlink(c); pa_idxset_free(p->connections, NULL, NULL); } |