diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/pulsecore/protocol-simple.c | 126 | 
1 files changed, 87 insertions, 39 deletions
diff --git a/src/pulsecore/protocol-simple.c b/src/pulsecore/protocol-simple.c index 8ec38fe4..cbe48440 100644 --- a/src/pulsecore/protocol-simple.c +++ b/src/pulsecore/protocol-simple.c @@ -32,6 +32,7 @@  #include <string.h>  #include <pulse/xmalloc.h> +#include <pulse/timeval.h>  #include <pulsecore/sink-input.h>  #include <pulsecore/source-output.h> @@ -42,6 +43,7 @@  #include <pulsecore/core-error.h>  #include <pulsecore/atomic.h>  #include <pulsecore/thread-mq.h> +#include <pulsecore/core-util.h>  #include "protocol-simple.h" @@ -57,12 +59,13 @@ typedef struct connection {      pa_client *client;      pa_memblockq *input_memblockq, *output_memblockq; -    int dead; +    pa_bool_t dead;      struct {          pa_memblock *current_memblock; -        size_t memblock_index, fragment_size; +        size_t memblock_index;          pa_atomic_t missing; +        pa_bool_t underrun;      } playback;  } connection; @@ -101,7 +104,8 @@ enum {  #define PLAYBACK_BUFFER_SECONDS (.5)  #define PLAYBACK_BUFFER_FRAGMENTS (10)  #define RECORD_BUFFER_SECONDS (5) -#define RECORD_BUFFER_FRAGMENTS (100) +#define DEFAULT_SINK_LATENCY (300*PA_USEC_PER_MSEC) +#define DEFAULT_SOURCE_LATENCY (300*PA_USEC_PER_MSEC)  static void connection_unlink(connection *c) {      pa_assert(c); @@ -140,8 +144,6 @@ static void connection_free(pa_object *o) {      connection *c = CONNECTION(o);      pa_assert(c); -    connection_unref(c); -      if (c->playback.current_memblock)          pa_memblock_unref(c->playback.current_memblock); @@ -158,27 +160,33 @@ static int do_read(connection *c) {      ssize_t r;      size_t l;      void *p; +    size_t space;      connection_assert_ref(c);      if (!c->sink_input || (l = pa_atomic_load(&c->playback.missing)) <= 0)          return 0; -    if (l > c->playback.fragment_size) -        l = c->playback.fragment_size; +    if (c->playback.current_memblock) { -    if (c->playback.current_memblock) -        if (pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index < l) { +        space = pa_memblock_get_length(c->playback.current_memblock) - c->playback.memblock_index; + +        if (space <= 0) {              pa_memblock_unref(c->playback.current_memblock);              c->playback.current_memblock = NULL; -            c->playback.memblock_index = 0;          } +    }      if (!c->playback.current_memblock) { -        pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, l)); +        pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, 0));          c->playback.memblock_index = 0; + +        space = pa_memblock_get_length(c->playback.current_memblock);      } +    if (l > space) +        l = space; +      p = pa_memblock_acquire(c->playback.current_memblock);      r = pa_iochannel_read(c->io, (uint8_t*) p + c->playback.memblock_index, l);      pa_memblock_release(c->playback.current_memblock); @@ -248,16 +256,16 @@ static void do_work(connection *c) {      if (c->dead)          return; -    if (pa_iochannel_is_readable(c->io)) { +    if (pa_iochannel_is_readable(c->io))          if (do_read(c) < 0)              goto fail; -    } else if (pa_iochannel_is_hungup(c->io)) + +    if (!c->sink_input && pa_iochannel_is_hungup(c->io))          goto fail; -    if (pa_iochannel_is_writable(c->io)) { +    if (pa_iochannel_is_writable(c->io))          if (do_write(c) < 0)              goto fail; -    }      return; @@ -266,7 +274,7 @@ fail:      if (c->sink_input) {          /* If there is a sink input, we first drain what we already have read before shutting down the connection */ -        c->dead = 1; +        c->dead = TRUE;          pa_iochannel_free(c->io);          c->io = NULL; @@ -318,15 +326,19 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int              /* New data from the main loop */              pa_memblockq_push_align(c->input_memblockq, chunk); +            if (pa_memblockq_is_readable(c->input_memblockq) && c->playback.underrun) { +                pa_log_debug("Requesting rewind due to end of underrun."); +                pa_sink_input_request_rewind(c->sink_input, 0, FALSE, TRUE); +            } +  /*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */              return 0;          } -        case SINK_INPUT_MESSAGE_DISABLE_PREBUF: { +        case SINK_INPUT_MESSAGE_DISABLE_PREBUF:              pa_memblockq_prebuf_disable(c->input_memblockq);              return 0; -        }          case PA_SINK_INPUT_MESSAGE_GET_LATENCY: {              pa_usec_t *r = userdata; @@ -345,32 +357,60 @@ static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int  /* Called from thread context */  static int sink_input_pop_cb(pa_sink_input *i, size_t length, pa_memchunk *chunk) {      connection *c; -    int r; -    pa_assert(i); +    pa_sink_input_assert_ref(i);      c = CONNECTION(i->userdata);      connection_assert_ref(c);      pa_assert(chunk); -    if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0) { +    if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + +        c->playback.underrun = TRUE;          if (c->dead && pa_sink_input_safe_to_remove(i))              pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); +        return -1;      } else { -        size_t old, new; +        size_t m; + +        c->playback.underrun = FALSE; -        old = pa_memblockq_missing(c->input_memblockq);          pa_memblockq_drop(c->input_memblockq, chunk->length); -        new = pa_memblockq_missing(c->input_memblockq); +        m = pa_memblockq_pop_missing(c->input_memblockq); -        if (new > old) { -            if (pa_atomic_add(&c->playback.missing, new - old) <= 0) +        if (m > 0) +            if (pa_atomic_add(&c->playback.missing, m) <= 0)                  pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); -        } + +        return 0;      } +} + +/* Called from thread context */ +static void sink_input_process_rewind_cb(pa_sink_input *i, size_t nbytes) { +    connection *c; -    return r; +    pa_sink_input_assert_ref(i); +    c = CONNECTION(i->userdata); +    connection_assert_ref(c); + +    /* If we are in an underrun, then we don't rewind */ +    if (i->thread_info.underrun_for > 0) +        return; + +    pa_memblockq_rewind(c->input_memblockq, nbytes); +} + +/* Called from thread context */ +static void sink_input_update_max_rewind_cb(pa_sink_input *i, size_t nbytes) { +    connection *c; + +    pa_sink_input_assert_ref(i); +    c = CONNECTION(i->userdata); +    connection_assert_ref(c); + +    pa_memblockq_set_maxrewind(c->input_memblockq, nbytes);  }  /* Called from main context */ @@ -386,7 +426,7 @@ static void sink_input_kill_cb(pa_sink_input *i) {  static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) {      connection *c; -    pa_assert(o); +    pa_source_output_assert_ref(o);      c = CONNECTION(o->userdata);      pa_assert(c);      pa_assert(chunk); @@ -405,7 +445,7 @@ static void source_output_kill_cb(pa_source_output *o) {  static pa_usec_t source_output_get_latency_cb(pa_source_output *o) {      connection*c; -    pa_assert(o); +    pa_source_output_assert_ref(o);      c = CONNECTION(o->userdata);      pa_assert(c); @@ -440,7 +480,7 @@ static void io_callback(pa_iochannel*io, void *userdata) {  static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) {      pa_protocol_simple *p = userdata;      connection *c = NULL; -    char cname[256]; +    char cname[256], pname[128];      pa_assert(s);      pa_assert(io); @@ -462,12 +502,14 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)      c->protocol = p;      c->playback.current_memblock = NULL;      c->playback.memblock_index = 0; -    c->playback.fragment_size = 0; -    c->dead = 0; +    c->dead = FALSE; +    c->playback.underrun = TRUE;      pa_atomic_store(&c->playback.missing, 0); -    pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); +    pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname)); +    pa_snprintf(cname, sizeof(cname), "Simple client (%s)", pname);      pa_assert_se(c->client = pa_client_new(p->core, __FILE__, cname)); +    pa_proplist_sets(c->client->proplist, "simple-protocol.peer", pname);      c->client->module = p->module;      c->client->kill = client_kill_cb;      c->client->userdata = c; @@ -500,21 +542,24 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)          c->sink_input->parent.process_msg = sink_input_process_msg;          c->sink_input->pop = sink_input_pop_cb; +        c->sink_input->process_rewind = sink_input_process_rewind_cb; +        c->sink_input->update_max_rewind = sink_input_update_max_rewind_cb;          c->sink_input->kill = sink_input_kill_cb;          c->sink_input->userdata = c; +        pa_sink_input_set_requested_latency(c->sink_input, DEFAULT_SINK_LATENCY); +          l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);          c->input_memblockq = pa_memblockq_new(                  0,                  l, -                0, +                l,                  pa_frame_size(&p->sample_spec),                  (size_t) -1,                  l/PLAYBACK_BUFFER_FRAGMENTS,                  0,                  NULL); -        pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); -        c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; +        pa_iochannel_socket_set_rcvbuf(io, l);          pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); @@ -551,6 +596,8 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)          c->source_output->get_latency = source_output_get_latency_cb;          c->source_output->userdata = c; +        pa_source_output_set_requested_latency(c->source_output, DEFAULT_SOURCE_LATENCY); +          l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);          c->output_memblockq = pa_memblockq_new(                  0, @@ -561,7 +608,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)                  0,                  0,                  NULL); -        pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); +        pa_iochannel_socket_set_sndbuf(io, l);          pa_source_output_put(c->source_output);      } @@ -582,6 +629,7 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv      pa_assert(core);      pa_assert(server); +    pa_assert(m);      pa_assert(ma);      p = pa_xnew0(pa_protocol_simple, 1); @@ -606,7 +654,7 @@ pa_protocol_simple* pa_protocol_simple_new(pa_core *core, pa_socket_server *serv      }      p->mode = enable ? RECORD : 0; -    enable = 1; +    enable = TRUE;      if (pa_modargs_get_value_boolean(ma, "playback", &enable) < 0) {          pa_log("playback= expects a numeric argument.");          goto fail;  | 
