From 8df6529ee6d1fb53f5fffece06fa820b393daebd Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 14 May 2008 00:42:46 +0000 Subject: some fixes to make the simple protocol work on glitch-free again git-svn-id: file:///home/lennart/svn/public/pulseaudio/branches/glitch-free@2414 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/pulsecore/protocol-simple.c | 126 +++++++++++++++++++++++++++------------- 1 file changed, 87 insertions(+), 39 deletions(-) diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 8ec38fe4..cbe48440 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -32,6 +32,7 @@ #include #include +#include #include #include @@ -42,6 +43,7 @@ #include #include #include +#include #include "protocol-simple.h" @@ -57,12 +59,13 @@ typedef struct connection { pa_client *client; pa_memblockq *input_memblockq, *output_memblockq; - int dead; + pa_bool_t dead; struct { pa_memblock *current_memblock; - size_t memblock_index, fragment_size; + size_t memblock_index; pa_atomic_t missing; + pa_bool_t underrun; } playback; } connection; @@ -101,7 +104,8 @@ enum { #define PLAYBACK_BUFFER_SECONDS (.5) #define PLAYBACK_BUFFER_FRAGMENTS (10) #define RECORD_BUFFER_SECONDS (5) -#define RECORD_BUFFER_FRAGMENTS (100) +#define DEFAULT_SINK_LATENCY (300*PA_USEC_PER_MSEC) +#define DEFAULT_SOURCE_LATENCY (300*PA_USEC_PER_MSEC) static void connection_unlink(connection *c) { pa_assert(c); @@ -140,8 +144,6 @@ static void connection_free(pa_object *o) { connection *c = CONNECTION(o); pa_assert(c); - connection_unref(c); - if (c->playback.current_memblock) pa_memblock_unref(c->playback.current_memblock); @@ -158,27 +160,33 @@ static int do_read(connection *c) { ssize_t r; size_t l; void *p; + size_t space; connection_assert_ref(c); if (!c->sink_input || (l = pa_atomic_load(&c->playback.missing)) <= 0) return 0; - if (l > c->playback.fragment_size) - l = c->playback.fragment_size; + if (c->playback.current_memblock) { - if (c->playback.current_memblock) - if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) { + space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index; + + if (space <= 0) { pa_memblock_unref(c->playback.current_memblock); c->playback.current_memblock = NULL; - c->playback.memblock_index = 0; } + } if (!c->playback.current_memblock) { - pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, l)); + pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, 0)); c->playback.memblock_index = 0; + + space = pa_memblock_get_length(c->playback.current_memblock); } + if (l > space) + l = space; + p = pa_memblock_acquire(c->playback.current_memblock); r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l); pa_memblock_release(c->playback.current_memblock); @@ -248,16 +256,16 @@ static void do_work(connection *c) { if (c->dead) return; - if (pa_iochannel_is_readable(c->io)) { + if (pa_iochannel_is_readable(c->io)) if (do_read(c) < 0) goto fail; - } else if (pa_iochannel_is_hungup(c->io)) + + if (!c->sink_input && pa_iochannel_is_hungup(c->io)) goto fail; - if (pa_iochannel_is_writable(c->io)) { + if (pa_iochannel_is_writable(c->io)) if (do_write(c) < 0) goto fail; - } return; @@ -266,7 +274,7 @@ fail: if (c->sink_input) { /* If there is a sink input, we first drain what we already have read before shutting down the connection */ - c->dead = 1; + c->dead = TRUE; pa_iochannel_free(c->io); c->io = NULL; @@ -318,15 +326,19 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int /* New data from the main loop */ pa_memblockq_push_align(c->input_memblockq, chunk); + if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) { + pa_log_debug("Requesting rewind due to end of underrun."); + pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE); + } + /* pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */ return 0; } - case SINK_INPUT_MESSAGE_DISABLE_PREBUF: { + case SINK_INPUT_MESSAGE_DISABLE_PREBUF: pa_memblockq_prebuf_disable(c->input_memblockq); return 0; - } case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { pa_usec_t *r = userdata; @@ -345,32 +357,60 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int /* Called from thread context */ static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) { connection *c; - int r; - pa_assert(i); + pa_sink_input_assert_ref(i); c = CONNECTION(i->userdata); connection_assert_ref(c); pa_assert(chunk); - if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0) { + if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + + c->playback.underrun = TRUE; if (c->dead && pa_sink_input_safe_to_remove(i)) pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + return -1; } else { - size_t old, new; + size_t m; + + c->playback.underrun = FALSE; - old = pa_memblockq_missing(c->input_memblockq); pa_memblockq_drop(c->input_memblockq, chunk->length); - new = pa_memblockq_missing(c->input_memblockq); + m = pa_memblockq_pop_missing(c->input_memblockq); - if (new > old) { - if (pa_atomic_add(&c->playback.missing, new - old) <= 0) + if (m > 0) + if (pa_atomic_add(&c->playback.missing, m) <= 0) pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); - } + + return 0; } +} + +/* Called from thread context */ +static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { + connection *c; - return r; + pa_sink_input_assert_ref(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + + /* If we are in an underrun, then we don't rewind */ + if (i->thread_info.underrun_for > 0) + return; + + pa_memblockq_rewind(c->input_memblockq, nbytes); +} + +/* Called from thread context */ +static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { + connection *c; + + pa_sink_input_assert_ref(i); + c = CONNECTION(i->userdata); + connection_assert_ref(c); + + pa_memblockq_set_maxrewind(c->input_memblockq, nbytes); } /* Called from main context */ @@ -386,7 +426,7 @@ static void sink_input_kill_cb(pa_sink_input *i) { static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { connection *c; - pa_assert(o); + pa_source_output_assert_ref(o); c = CONNECTION(o->userdata); pa_assert(c); pa_assert(chunk); @@ -405,7 +445,7 @@ static void source_output_kill_cb(pa_source_output *o) { static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { connection*c; - pa_assert(o); + pa_source_output_assert_ref(o); c = CONNECTION(o->userdata); pa_assert(c); @@ -440,7 +480,7 @@ static void io_callback(pa_iochannel*io, void *userdata) { static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) { pa_protocol_simple *p = userdata; connection *c = NULL; - char cname[256]; + char cname[256], pname[128]; pa_assert(s); pa_assert(io); @@ -462,12 +502,14 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->protocol = p; c->playback.current_memblock = NULL; c->playback.memblock_index = 0; - c->playback.fragment_size = 0; - c->dead = 0; + c->dead = FALSE; + c->playback.underrun = TRUE; pa_atomic_store(&c->playback.missing, 0); - pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); + pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname)); + pa_snprintf(cname, sizeof(cname), "Simple client (%s)", pname); pa_assert_se(c->client = pa_client_new(p->core, __FILE__, cname)); + pa_proplist_sets(c->client->proplist, "simple-protocol.peer", pname); c->client->module = p->module; c->client->kill = client_kill_cb; c->client->userdata = c; @@ -500,21 +542,24 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->sink_input->parent.process_msg = sink_input_process_msg; c->sink_input->pop = sink_input_pop_cb; + c->sink_input->process_rewind = sink_input_process_rewind_cb; + c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb; c->sink_input->kill = sink_input_kill_cb; c->sink_input->userdata = c; + pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY); + l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS); c->input_memblockq = pa_memblockq_new( 0, l, - 0, + l, pa_frame_size(&p->sample_spec), (size_t) -1, l/PLAYBACK_BUFFER_FRAGMENTS, 0, NULL); - pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); - c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; + pa_iochannel_socket_set_rcvbuf(io, l); pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); @@ -551,6 +596,8 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->source_output->get_latency = source_output_get_latency_cb; c->source_output->userdata = c; + pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY); + l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS); c->output_memblockq = pa_memblockq_new( 0, @@ -561,7 +608,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) 0, 0, NULL); - pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); + pa_iochannel_socket_set_sndbuf(io, l); pa_source_output_put(c->source_output); } @@ -582,6 +629,7 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv pa_assert(core); pa_assert(server); + pa_assert(m); pa_assert(ma); p = pa_xnew0(pa_protocol_simple, 1); @@ -606,7 +654,7 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv } p->mode = enable ? RECORD : 0; - enable = 1; + enable = TRUE; if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) { pa_log("playback= expects a numeric argument."); goto fail; -- cgit