diff options
Diffstat (limited to 'src/pulsecore/protocol-esound.c')
| -rw-r--r-- | src/pulsecore/protocol-esound.c | 555 | 
1 files changed, 352 insertions, 203 deletions
| diff --git a/src/pulsecore/protocol-esound.c b/src/pulsecore/protocol-esound.c index fe0b879b..74607146 100644 --- a/src/pulsecore/protocol-esound.c +++ b/src/pulsecore/protocol-esound.c @@ -29,7 +29,6 @@  #include <errno.h>  #include <string.h>  #include <stdio.h> -#include <assert.h>  #include <stdlib.h>  #include <limits.h> @@ -53,6 +52,7 @@  #include <pulsecore/core-util.h>  #include <pulsecore/core-error.h>  #include <pulsecore/ipacl.h> +#include <pulsecore/macro.h>  #include "endianmacros.h" @@ -77,7 +77,9 @@  /* This is heavily based on esound's code */ -struct connection { +typedef struct connection { +    pa_msgobject parent; +          uint32_t index;      int dead;      pa_protocol_esound *protocol; @@ -100,6 +102,7 @@ struct connection {      struct {          pa_memblock *current_memblock;          size_t memblock_index, fragment_size; +        pa_atomic_t missing;      } playback;      struct { @@ -109,46 +112,62 @@ struct connection {      } scache;      pa_time_event *auth_timeout_event; -}; +} connection; + +PA_DECLARE_CLASS(connection); +#define CONNECTION(o) (connection_cast(o)) +static PA_DEFINE_CHECK_TYPE(connection, pa_msgobject);  struct pa_protocol_esound { -    int public;      pa_module *module;      pa_core *core; +    int public;      pa_socket_server *server;      pa_idxset *connections; +          char *sink_name, *source_name;      unsigned n_player;      uint8_t esd_key[ESD_KEY_LEN];      pa_ip_acl *auth_ip_acl;  }; +enum { +    SINK_INPUT_MESSAGE_POST_DATA = PA_SINK_INPUT_MESSAGE_MAX, /* data from main loop to sink input */ +    SINK_INPUT_MESSAGE_DISABLE_PREBUF +}; + +enum { +    CONNECTION_MESSAGE_REQUEST_DATA, +    CONNECTION_MESSAGE_POST_DATA, +    CONNECTION_MESSAGE_UNLINK_CONNECTION +}; +  typedef struct proto_handler {      size_t data_length; -    int (*proc)(struct connection *c, esd_proto_t request, const void *data, size_t length); +    int (*proc)(connection *c, esd_proto_t request, const void *data, size_t length);      const char *description;  } esd_proto_handler_info_t; -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length); +static void sink_input_drop_cb(pa_sink_input *i, size_t length);  static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk);  static void sink_input_kill_cb(pa_sink_input *i); -static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i); +static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk);  static pa_usec_t source_output_get_latency_cb(pa_source_output *o);  static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk);  static void source_output_kill_cb(pa_source_output *o); -static int esd_proto_connect(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_stream_play(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_stream_record(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_get_latency(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_server_info(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_all_info(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_stream_pan(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_sample_cache(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_sample_get_id(struct connection *c, esd_proto_t request, const void *data, size_t length); -static int esd_proto_standby_or_resume(struct connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_connect(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_stream_play(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_get_latency(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_server_info(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_stream_pan(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_sample_cache(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_sample_get_id(connection *c, esd_proto_t request, const void *data, size_t length); +static int esd_proto_standby_or_resume(connection *c, esd_proto_t request, const void *data, size_t length);  /* the big map of protocol handler info */  static struct proto_handler proto_map[ESD_PROTO_MAX] = { @@ -185,25 +204,56 @@ static struct proto_handler proto_map[ESD_PROTO_MAX] = {      { 0,                              esd_proto_get_latency, "get latency" }  }; -static void connection_free(struct connection *c) { -    assert(c); -    pa_idxset_remove_by_data(c->protocol->connections, c, NULL); - -    if (c->state == ESD_STREAMING_DATA) -        c->protocol->n_player--; +static void connection_unlink(connection *c) { +    pa_assert(c); -    pa_client_free(c->client); +    if (!c->protocol) +        return;      if (c->sink_input) {          pa_sink_input_disconnect(c->sink_input);          pa_sink_input_unref(c->sink_input); +        c->sink_input = NULL;      }      if (c->source_output) {          pa_source_output_disconnect(c->source_output);          pa_source_output_unref(c->source_output); +        c->source_output = NULL;      } +    if (c->client) { +        pa_client_free(c->client); +        c->client = NULL; +    } +     +    if (c->state == ESD_STREAMING_DATA) +        c->protocol->n_player--; + +    if (c->io) { +        pa_iochannel_free(c->io); +        c->io = NULL; +    } + +    if (c->defer_event) { +        c->protocol->core->mainloop->defer_free(c->defer_event); +        c->defer_event = NULL; +    } + +    if (c->auth_timeout_event) { +        c->protocol->core->mainloop->time_free(c->auth_timeout_event); +        c->auth_timeout_event = NULL; +    } + +    pa_assert_se(pa_idxset_remove_by_data(c->protocol->connections, c, NULL) == c); +    c->protocol = NULL; +    connection_unref(c); +} + +static void connection_free(pa_object *obj) { +    connection *c = CONNECTION(obj); +    pa_assert(c); +          if (c->input_memblockq)          pa_memblockq_free(c->input_memblockq);      if (c->output_memblockq) @@ -215,54 +265,44 @@ static void connection_free(struct connection *c) {      pa_xfree(c->read_data);      pa_xfree(c->write_data); -    if (c->io) -        pa_iochannel_free(c->io); - -    if (c->defer_event) -        c->protocol->core->mainloop->defer_free(c->defer_event); -      if (c->scache.memchunk.memblock)          pa_memblock_unref(c->scache.memchunk.memblock);      pa_xfree(c->scache.name); -    if (c->auth_timeout_event) -        c->protocol->core->mainloop->time_free(c->auth_timeout_event); -      pa_xfree(c->original_name);      pa_xfree(c);  } -static void connection_write_prepare(struct connection *c, size_t length) { +static void connection_write_prepare(connection *c, size_t length) {      size_t t; -    assert(c); +    pa_assert(c);      t = c->write_data_length+length;      if (c->write_data_alloc < t)          c->write_data = pa_xrealloc(c->write_data, c->write_data_alloc = t); -    assert(c->write_data); +    pa_assert(c->write_data);  } -static void connection_write(struct connection *c, const void *data, size_t length) { +static void connection_write(connection *c, const void *data, size_t length) {      size_t i; -    assert(c); +    pa_assert(c); -    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);      c->protocol->core->mainloop->defer_enable(c->defer_event, 1);      connection_write_prepare(c, length); -    assert(c->write_data); +    pa_assert(c->write_data);      i = c->write_data_length;      c->write_data_length += length; -    memcpy((char*)c->write_data + i, data, length); +    memcpy((uint8_t*) c->write_data + i, data, length);  }  static void format_esd2native(int format, int swap_bytes, pa_sample_spec *ss) { -    assert(ss); +    pa_assert(ss);      ss->channels = ((format & ESD_MASK_CHAN) == ESD_STEREO) ? 2 : 1;      if ((format & ESD_MASK_BITS) == ESD_BITS16) @@ -289,11 +329,13 @@ static int format_native2esd(pa_sample_spec *ss) {  /*** esound commands ***/ -static int esd_proto_connect(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_connect(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) {      uint32_t ekey;      int ok; -    assert(length == (ESD_KEY_LEN + sizeof(uint32_t))); +    connection_assert_ref(c); +    pa_assert(data); +    pa_assert(length == (ESD_KEY_LEN + sizeof(uint32_t)));      if (!c->authorized) {          if (memcmp(data, c->protocol->esd_key, ESD_KEY_LEN) != 0) { @@ -316,7 +358,7 @@ static int esd_proto_connect(struct connection *c, PA_GCC_UNUSED esd_proto_t req      else if (ekey == ESD_SWAP_ENDIAN_KEY)          c->swap_byte_order = 1;      else { -        pa_log("client sent invalid endian key"); +        pa_log_warn("Client sent invalid endian key");          return -1;      } @@ -325,7 +367,7 @@ static int esd_proto_connect(struct connection *c, PA_GCC_UNUSED esd_proto_t req      return 0;  } -static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_stream_play(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) {      char name[ESD_NAME_MAX], *utf8_name;      int32_t format, rate;      pa_sample_spec ss; @@ -333,15 +375,17 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t      pa_sink *sink = NULL;      pa_sink_input_new_data sdata; -    assert(c && length == (sizeof(int32_t)*2+ESD_NAME_MAX)); +    connection_assert_ref(c); +    pa_assert(data); +    pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));      memcpy(&format, data, sizeof(int32_t));      format = MAYBE_INT32_SWAP(c->swap_byte_order, format); -    data = (const char*)data + sizeof(int32_t); +    data = (const char*) data + sizeof(int32_t);      memcpy(&rate, data, sizeof(int32_t));      rate = MAYBE_INT32_SWAP(c->swap_byte_order, rate); -    data = (const char*)data + sizeof(int32_t); +    data = (const char*) data + sizeof(int32_t);      ss.rate = rate;      format_esd2native(format, c->swap_byte_order, &ss); @@ -362,7 +406,7 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t      c->original_name = pa_xstrdup(name); -    assert(!c->sink_input && !c->input_memblockq); +    pa_assert(!c->sink_input && !c->input_memblockq);      pa_sink_input_new_data_init(&sdata);      sdata.sink = sink; @@ -385,22 +429,26 @@ static int esd_proto_stream_play(struct connection *c, PA_GCC_UNUSED esd_proto_t              l/PLAYBACK_BUFFER_FRAGMENTS,              NULL);      pa_iochannel_socket_set_rcvbuf(c->io, l/PLAYBACK_BUFFER_FRAGMENTS*2); -    c->playback.fragment_size = l/10; +    c->playback.fragment_size = l/PLAYBACK_BUFFER_FRAGMENTS; +    c->sink_input->parent.process_msg = sink_input_process_msg;      c->sink_input->peek = sink_input_peek_cb;      c->sink_input->drop = sink_input_drop_cb;      c->sink_input->kill = sink_input_kill_cb; -    c->sink_input->get_latency = sink_input_get_latency_cb;      c->sink_input->userdata = c;      c->state = ESD_STREAMING_DATA;      c->protocol->n_player++; +    pa_atomic_store(&c->playback.missing, pa_memblockq_missing(c->input_memblockq)); + +    pa_sink_input_put(c->sink_input); +          return 0;  } -static int esd_proto_stream_record(struct connection *c, esd_proto_t request, const void *data, size_t length) { +static int esd_proto_stream_record(connection *c, esd_proto_t request, const void *data, size_t length) {      char name[ESD_NAME_MAX], *utf8_name;      int32_t format, rate;      pa_source *source = NULL; @@ -408,15 +456,17 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co      size_t l;      pa_source_output_new_data sdata; -    assert(c && length == (sizeof(int32_t)*2+ESD_NAME_MAX)); +    connection_assert_ref(c); +    pa_assert(data); +    pa_assert(length == (sizeof(int32_t)*2+ESD_NAME_MAX));      memcpy(&format, data, sizeof(int32_t));      format = MAYBE_INT32_SWAP(c->swap_byte_order, format); -    data = (const char*)data + sizeof(int32_t); +    data = (const char*) data + sizeof(int32_t);      memcpy(&rate, data, sizeof(int32_t));      rate = MAYBE_INT32_SWAP(c->swap_byte_order, rate); -    data = (const char*)data + sizeof(int32_t); +    data = (const char*) data + sizeof(int32_t);      ss.rate = rate;      format_esd2native(format, c->swap_byte_order, &ss); @@ -436,7 +486,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co              return -1;          }      } else { -        assert(request == ESD_PROTO_STREAM_REC); +        pa_assert(request == ESD_PROTO_STREAM_REC);          if (c->protocol->source_name) {              if (!(source = pa_namereg_get(c->protocol->core, c->protocol->source_name, PA_NAMEREG_SOURCE, 1))) { @@ -455,7 +505,7 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co      c->original_name = pa_xstrdup(name); -    assert(!c->output_memblockq && !c->source_output); +    pa_assert(!c->output_memblockq && !c->source_output);      pa_source_output_new_data_init(&sdata);      sdata.source = source; @@ -488,14 +538,18 @@ static int esd_proto_stream_record(struct connection *c, esd_proto_t request, co      c->protocol->n_player++; +    pa_source_output_put(c->source_output); +      return 0;  } -static int esd_proto_get_latency(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_get_latency(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) {      pa_sink *sink;      int32_t latency; -    assert(c && !data && length == 0); +    connection_ref(c); +    pa_assert(!data); +    pa_assert(length == 0);      if (!(sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1)))          latency = 0; @@ -509,12 +563,14 @@ static int esd_proto_get_latency(struct connection *c, PA_GCC_UNUSED esd_proto_t      return 0;  } -static int esd_proto_server_info(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_server_info(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) {      int32_t rate = 44100, format = ESD_STEREO|ESD_BITS16;      int32_t response;      pa_sink *sink; -    assert(c && data && length == sizeof(int32_t)); +    connection_ref(c); +    pa_assert(data); +    pa_assert(length == sizeof(int32_t));      if ((sink = pa_namereg_get(c->protocol->core, c->protocol->sink_name, PA_NAMEREG_SINK, 1))) {          rate = sink->sample_spec.rate; @@ -533,14 +589,16 @@ static int esd_proto_server_info(struct connection *c, PA_GCC_UNUSED esd_proto_t      return 0;  } -static int esd_proto_all_info(struct connection *c, esd_proto_t request, const void *data, size_t length) { +static int esd_proto_all_info(connection *c, esd_proto_t request, const void *data, size_t length) {      size_t t, k, s; -    struct connection *conn; +    connection *conn;      uint32_t idx = PA_IDXSET_INVALID;      unsigned nsamples;      char terminator[sizeof(int32_t)*6+ESD_NAME_MAX]; -    assert(c && data && length == sizeof(int32_t)); +    connection_ref(c); +    pa_assert(data); +    pa_assert(length == sizeof(int32_t));      if (esd_proto_server_info(c, request, data, length) < 0)          return -1; @@ -561,7 +619,7 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v          if (conn->state != ESD_STREAMING_DATA)              continue; -        assert(t >= k*2+s); +        pa_assert(t >= k*2+s);          if (conn->sink_input) {              pa_cvolume volume = *pa_sink_input_get_volume(conn->sink_input); @@ -602,7 +660,7 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v          t -= k;      } -    assert(t == s*(nsamples+1)+k); +    pa_assert(t == s*(nsamples+1)+k);      t -= k;      connection_write(c, terminator, k); @@ -615,7 +673,7 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v              int32_t id, rate, lvolume, rvolume, format, len;              char name[ESD_NAME_MAX]; -            assert(t >= s*2); +            pa_assert(t >= s*2);              /* id */              id = MAYBE_INT32_SWAP(c->swap_byte_order, (int) (ce->index+1)); @@ -653,19 +711,21 @@ static int esd_proto_all_info(struct connection *c, esd_proto_t request, const v          }      } -    assert(t == s); +    pa_assert(t == s);      connection_write(c, terminator, s);      return 0;  } -static int esd_proto_stream_pan(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_stream_pan(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) {      int32_t ok;      uint32_t idx, lvolume, rvolume; -    struct connection *conn; +    connection *conn; -    assert(c && data && length == sizeof(int32_t)*3); +    connection_assert_ref(c); +    pa_assert(data); +    pa_assert(length == sizeof(int32_t)*3);      memcpy(&idx, data, sizeof(uint32_t));      idx = MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1; @@ -694,13 +754,15 @@ static int esd_proto_stream_pan(struct connection *c, PA_GCC_UNUSED esd_proto_t      return 0;  } -static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_sample_cache(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) {      pa_sample_spec ss;      int32_t format, rate, sc_length;      uint32_t idx;      char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1]; -    assert(c && data && length == (ESD_NAME_MAX+3*sizeof(int32_t))); +    connection_assert_ref(c); +    pa_assert(data); +    pa_assert(length == (ESD_NAME_MAX+3*sizeof(int32_t)));      memcpy(&format, data, sizeof(int32_t));      format = MAYBE_INT32_SWAP(c->swap_byte_order, format); @@ -727,12 +789,12 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_      CHECK_VALIDITY(pa_utf8_valid(name), "Invalid UTF8 in sample name."); -    assert(!c->scache.memchunk.memblock); +    pa_assert(!c->scache.memchunk.memblock);      c->scache.memchunk.memblock = pa_memblock_new(c->protocol->core->mempool, sc_length);      c->scache.memchunk.index = 0;      c->scache.memchunk.length = sc_length;      c->scache.sample_spec = ss; -    assert(!c->scache.name); +    pa_assert(!c->scache.name);      c->scache.name = pa_xstrdup(name);      c->state = ESD_CACHING_SAMPLE; @@ -745,12 +807,14 @@ static int esd_proto_sample_cache(struct connection *c, PA_GCC_UNUSED esd_proto_      return 0;  } -static int esd_proto_sample_get_id(struct connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) { +static int esd_proto_sample_get_id(connection *c, PA_GCC_UNUSED esd_proto_t request, const void *data, size_t length) {      int32_t ok;      uint32_t idx;      char name[ESD_NAME_MAX+sizeof(SCACHE_PREFIX)-1]; -    assert(c && data && length == ESD_NAME_MAX); +    connection_assert_ref(c); +    pa_assert(data); +    pa_assert(length == ESD_NAME_MAX);      strcpy(name, SCACHE_PREFIX);      strncpy(name+sizeof(SCACHE_PREFIX)-1, data, ESD_NAME_MAX); @@ -767,12 +831,14 @@ static int esd_proto_sample_get_id(struct connection *c, PA_GCC_UNUSED esd_proto      return 0;  } -static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t request, const void *data, size_t length) { +static int esd_proto_sample_free_or_play(connection *c, esd_proto_t request, const void *data, size_t length) {      int32_t ok;      const char *name;      uint32_t idx; -    assert(c && data && length == sizeof(int32_t)); +    connection_assert_ref(c); +    pa_assert(data); +    pa_assert(length == sizeof(int32_t));      memcpy(&idx, data, sizeof(uint32_t));      idx = MAYBE_UINT32_SWAP(c->swap_byte_order, idx) - 1; @@ -787,7 +853,7 @@ static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t reque                  if (pa_scache_play_item(c->protocol->core, name, sink, PA_VOLUME_NORM) >= 0)                      ok = idx + 1;          } else { -            assert(request == ESD_PROTO_SAMPLE_FREE); +            pa_assert(request == ESD_PROTO_SAMPLE_FREE);              if (pa_scache_remove_item(c->protocol->core, name) >= 0)                  ok = idx + 1; @@ -799,9 +865,11 @@ static int esd_proto_sample_free_or_play(struct connection *c, esd_proto_t reque      return 0;  } -static int esd_proto_standby_or_resume(struct connection *c, PA_GCC_UNUSED esd_proto_t request, PA_GCC_UNUSED const void *data, PA_GCC_UNUSED size_t length) { +static int esd_proto_standby_or_resume(connection *c, PA_GCC_UNUSED esd_proto_t request, PA_GCC_UNUSED const void *data, PA_GCC_UNUSED size_t length) {      int32_t ok; +    connection_assert_ref(c); +      connection_write_prepare(c, sizeof(int32_t) * 2);      ok = 1; @@ -814,20 +882,21 @@ static int esd_proto_standby_or_resume(struct connection *c, PA_GCC_UNUSED esd_p  /*** client callbacks ***/  static void client_kill_cb(pa_client *c) { -    assert(c && c->userdata); -    connection_free(c->userdata); +    pa_assert(c); +     +    connection_unlink(CONNECTION(c->userdata));  }  /*** pa_iochannel callbacks ***/ -static int do_read(struct connection *c) { -    assert(c && c->io); +static int do_read(connection *c) { +    connection_assert_ref(c);  /*      pa_log("READ");  */      if (c->state == ESD_NEXT_REQUEST) {          ssize_t r; -        assert(c->read_data_length < sizeof(c->request)); +        pa_assert(c->read_data_length < sizeof(c->request));          if ((r = pa_iochannel_read(c->io, ((uint8_t*) &c->request) + c->read_data_length, sizeof(c->request) - c->read_data_length)) <= 0) {              pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF"); @@ -862,7 +931,7 @@ static int do_read(struct connection *c) {              } else {                  if (c->read_data_alloc < handler->data_length)                      c->read_data = pa_xrealloc(c->read_data, c->read_data_alloc = handler->data_length); -                assert(c->read_data); +                pa_assert(c->read_data);                  c->state = ESD_NEEDS_REQDATA;                  c->read_data_length = 0; @@ -873,18 +942,21 @@ static int do_read(struct connection *c) {          ssize_t r;          struct proto_handler *handler = proto_map+c->request; -        assert(handler->proc); +        pa_assert(handler->proc); -        assert(c->read_data && c->read_data_length < handler->data_length); +        pa_assert(c->read_data && c->read_data_length < handler->data_length);          if ((r = pa_iochannel_read(c->io, (uint8_t*) c->read_data + c->read_data_length, handler->data_length - c->read_data_length)) <= 0) { +            if (errno == EINTR || errno == EAGAIN) +                return 0; +                          pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");              return -1;          }          if ((c->read_data_length += r) >= handler->data_length) {              size_t l = c->read_data_length; -            assert(handler->proc); +            pa_assert(handler->proc);              c->state = ESD_NEXT_REQUEST;              c->read_data_length = 0; @@ -896,22 +968,24 @@ static int do_read(struct connection *c) {          ssize_t r;          void *p; -        assert(c->scache.memchunk.memblock); -        assert(c->scache.name); -        assert(c->scache.memchunk.index < c->scache.memchunk.length); +        pa_assert(c->scache.memchunk.memblock); +        pa_assert(c->scache.name); +        pa_assert(c->scache.memchunk.index < c->scache.memchunk.length);          p = pa_memblock_acquire(c->scache.memchunk.memblock); - -        if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index)) <= 0) { -            pa_memblock_release(c->scache.memchunk.memblock); +        r = pa_iochannel_read(c->io, (uint8_t*) p+c->scache.memchunk.index, c->scache.memchunk.length-c->scache.memchunk.index); +        pa_memblock_release(c->scache.memchunk.memblock); +         +        if (r <= 0) { +            if (errno == EINTR || errno == EAGAIN) +                return 0; +                          pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");              return -1;          } -        pa_memblock_release(c->scache.memchunk.memblock); -          c->scache.memchunk.index += r; -        assert(c->scache.memchunk.index <= c->scache.memchunk.length); +        pa_assert(c->scache.memchunk.index <= c->scache.memchunk.length);          if (c->scache.memchunk.index == c->scache.memchunk.length) {              uint32_t idx; @@ -938,11 +1012,11 @@ static int do_read(struct connection *c) {          size_t l;          void *p; -        assert(c->input_memblockq); +        pa_assert(c->input_memblockq);  /*         pa_log("STREAMING_DATA"); */ -        if (!(l = pa_memblockq_missing(c->input_memblockq))) +        if (!(l = pa_atomic_load(&c->playback.missing)))              return 0;          if (l > c->playback.fragment_size) @@ -956,47 +1030,50 @@ static int do_read(struct connection *c) {              }          if (!c->playback.current_memblock) { -            c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2); -            assert(c->playback.current_memblock); -            assert(pa_memblock_get_length(c->playback.current_memblock) >= l); +            pa_assert_se(c->playback.current_memblock = pa_memblock_new(c->protocol->core->mempool, c->playback.fragment_size*2));              c->playback.memblock_index = 0;          }          p = pa_memblock_acquire(c->playback.current_memblock); +        r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l); +        pa_memblock_release(c->playback.current_memblock); +         +        if (r <= 0) { +             +            if (errno == EINTR || errno == EAGAIN) +                return 0; -        if ((r = pa_iochannel_read(c->io, (uint8_t*) p+c->playback.memblock_index, l)) <= 0) { -            pa_memblock_release(c->playback.current_memblock);              pa_log_debug("read(): %s", r < 0 ? pa_cstrerror(errno) : "EOF");              return -1;          } -        pa_memblock_release(c->playback.current_memblock);          chunk.memblock = c->playback.current_memblock;          chunk.index = c->playback.memblock_index;          chunk.length = r; -        assert(chunk.memblock);          c->playback.memblock_index += r; -        assert(c->input_memblockq); -        pa_memblockq_push_align(c->input_memblockq, &chunk); -        assert(c->sink_input); -        pa_sink_notify(c->sink_input->sink); +        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_POST_DATA, NULL, 0, &chunk, NULL); +        pa_atomic_sub(&c->playback.missing, r);      }      return 0;  } -static int do_write(struct connection *c) { -    assert(c && c->io); +static int do_write(connection *c) { +    connection_assert_ref(c);  /*     pa_log("WRITE"); */      if (c->write_data_length) {          ssize_t r; -        assert(c->write_data_index < c->write_data_length); +        pa_assert(c->write_data_index < c->write_data_length);          if ((r = pa_iochannel_write(c->io, (uint8_t*) c->write_data+c->write_data_index, c->write_data_length-c->write_data_index)) < 0) { + +            if (errno == EINTR || errno == EAGAIN) +                return 0; +                          pa_log("write(): %s", pa_cstrerror(errno));              return -1;          } @@ -1009,37 +1086,36 @@ static int do_write(struct connection *c) {          ssize_t r;          void *p; -        assert(c->output_memblockq);          if (pa_memblockq_peek(c->output_memblockq, &chunk) < 0)              return 0; -        assert(chunk.memblock); -        assert(chunk.length); +        pa_assert(chunk.memblock); +        pa_assert(chunk.length);          p = pa_memblock_acquire(chunk.memblock); +        r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length); +        pa_memblock_release(chunk.memblock); + +        pa_memblock_unref(chunk.memblock); +         +        if (r < 0) { -        if ((r = pa_iochannel_write(c->io, (uint8_t*) p+chunk.index, chunk.length)) < 0) { -            pa_memblock_release(chunk.memblock); -            pa_memblock_unref(chunk.memblock); +            if (errno == EINTR || errno == EAGAIN) +                return 0; +                      pa_log("write(): %s", pa_cstrerror(errno));              return -1;          } -        pa_memblock_release(chunk.memblock); - -        pa_memblockq_drop(c->output_memblockq, &chunk, r); -        pa_memblock_unref(chunk.memblock); - -        pa_source_notify(c->source_output->source); +        pa_memblockq_drop(c->output_memblockq, r);      }      return 0;  } -static void do_work(struct connection *c) { -    assert(c); +static void do_work(connection *c) { +    connection_assert_ref(c); -    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);      c->protocol->core->mainloop->defer_enable(c->defer_event, 0);      if (c->dead) @@ -1070,117 +1146,188 @@ fail:          pa_iochannel_free(c->io);          c->io = NULL; -        pa_memblockq_prebuf_disable(c->input_memblockq); -        pa_sink_notify(c->sink_input->sink); +        pa_asyncmsgq_post(c->sink_input->sink->asyncmsgq, PA_MSGOBJECT(c->sink_input), SINK_INPUT_MESSAGE_DISABLE_PREBUF, NULL, 0, NULL, NULL);      } else -        connection_free(c); +        connection_unlink(c);  }  static void io_callback(pa_iochannel*io, void *userdata) { -    struct connection *c = userdata; -    assert(io && c && c->io == io); +    connection *c = CONNECTION(userdata); + +    connection_assert_ref(c); +    pa_assert(io);      do_work(c);  } -/*** defer callback ***/ -  static void defer_callback(pa_mainloop_api*a, pa_defer_event *e, void *userdata) { -    struct connection *c = userdata; -    assert(a && c && c->defer_event == e); +    connection *c = CONNECTION(userdata); -/*     pa_log("DEFER"); */ +    connection_assert_ref(c); +    pa_assert(e);      do_work(c);  } +static int connection_process_msg(pa_msgobject *o, int code, void*userdata, int64_t offset, pa_memchunk *chunk) { +    connection *c = CONNECTION(o); +    connection_assert_ref(c); + +    switch (code) { +        case CONNECTION_MESSAGE_REQUEST_DATA: +            do_work(c); +            break; +             +        case CONNECTION_MESSAGE_POST_DATA: +/*             pa_log("got data %u", chunk->length); */ +            pa_memblockq_push_align(c->output_memblockq, chunk); +            do_work(c); +            break; + +        case CONNECTION_MESSAGE_UNLINK_CONNECTION: +            connection_unlink(c); +            break; +    } + +    return 0; +} +  /*** sink_input callbacks ***/ -static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { -    struct connection*c; -    assert(i && i->userdata && chunk); -    c = i->userdata; +static int sink_input_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offset, pa_memchunk *chunk) { +    pa_sink_input *i = PA_SINK_INPUT(o); +    connection*c; -    if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { +    pa_sink_input_assert_ref(i); +    c = CONNECTION(i->userdata); +    connection_assert_ref(c); -        if (c->dead) -            connection_free(c); +    switch (code) { -        return -1; -    } +        case SINK_INPUT_MESSAGE_POST_DATA: { +            pa_assert(chunk); -    return 0; +            /* New data from the main loop */ +            pa_memblockq_push_align(c->input_memblockq, chunk); + +/*             pa_log("got data, %u", pa_memblockq_get_length(c->input_memblockq)); */ +             +            return 0; +        } + +        case SINK_INPUT_MESSAGE_DISABLE_PREBUF: { +            pa_memblockq_prebuf_disable(c->input_memblockq); +            return 0; +        } + +        case PA_SINK_INPUT_MESSAGE_GET_LATENCY: { +            pa_usec_t *r = userdata; + +            *r = pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); + +            /* Fall through, the default handler will add in the extra +             * latency added by the resampler */ +        } + +        default: +            return pa_sink_input_process_msg(o, code, userdata, offset, chunk); +    }  } -static void sink_input_drop_cb(pa_sink_input *i, const pa_memchunk *chunk, size_t length) { -    struct connection*c = i->userdata; -    assert(i && c && length); -/*     pa_log("DROP"); */ +static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { +    connection*c; +    int r; +     +    pa_assert(i); +    c = CONNECTION(i->userdata); +    connection_assert_ref(c); +    pa_assert(chunk); + +    if ((r = pa_memblockq_peek(c->input_memblockq, chunk)) < 0 && c->dead) +        pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_UNLINK_CONNECTION, NULL, 0, NULL, NULL); + +    return r; +} + +static void sink_input_drop_cb(pa_sink_input *i, size_t length) { +    connection*c; +    size_t old, new; -    pa_memblockq_drop(c->input_memblockq, chunk, length); +    pa_assert(i); +    c = CONNECTION(i->userdata); +    connection_assert_ref(c); +    pa_assert(length); -    /* do something */ -    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); +    /*     pa_log("DROP"); */ -    if (!c->dead) -        c->protocol->core->mainloop->defer_enable(c->defer_event, 1); +    old = pa_memblockq_missing(c->input_memblockq); +    pa_memblockq_drop(c->input_memblockq, length); +    new = pa_memblockq_missing(c->input_memblockq); -/*     assert(pa_memblockq_get_length(c->input_memblockq) > 2048); */ +    if (new > old) { +        if (pa_atomic_add(&c->playback.missing, new - old) <= 0) +            pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_REQUEST_DATA, NULL, 0, NULL, NULL); +    }  }  static void sink_input_kill_cb(pa_sink_input *i) { -    assert(i && i->userdata); -    connection_free((struct connection *) i->userdata); -} +    pa_sink_input_assert_ref(i); -static pa_usec_t sink_input_get_latency_cb(pa_sink_input *i) { -    struct connection*c = i->userdata; -    assert(i && c); -    return pa_bytes_to_usec(pa_memblockq_get_length(c->input_memblockq), &c->sink_input->sample_spec); +    connection_unlink(CONNECTION(i->userdata));  }  /*** source_output callbacks ***/  static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) { -    struct connection *c = o->userdata; -    assert(o && c && chunk); - -    pa_memblockq_push(c->output_memblockq, chunk); +    connection *c; -    /* do something */ -    assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); +    pa_assert(o); +    c = CONNECTION(o->userdata); +    pa_assert(c); +    pa_assert(chunk); -    if (!c->dead) -        c->protocol->core->mainloop->defer_enable(c->defer_event, 1); +    pa_asyncmsgq_post(c->protocol->core->asyncmsgq, PA_MSGOBJECT(c), CONNECTION_MESSAGE_POST_DATA, NULL, 0, chunk, NULL);  }  static void source_output_kill_cb(pa_source_output *o) { -    assert(o && o->userdata); -    connection_free((struct connection *) o->userdata); +    pa_source_output_assert_ref(o); + +    connection_unlink(CONNECTION(o->userdata));  }  static pa_usec_t source_output_get_latency_cb(pa_source_output *o) { -    struct connection*c = o->userdata; -    assert(o && c); +    connection*c; + +    pa_assert(o); +    c = CONNECTION(o->userdata); +    pa_assert(c); +      return pa_bytes_to_usec(pa_memblockq_get_length(c->output_memblockq), &c->source_output->sample_spec);  }  /*** socket server callback ***/  static void auth_timeout(pa_mainloop_api*m, pa_time_event *e, const struct timeval *tv, void *userdata) { -    struct connection *c = userdata; -    assert(m && tv && c && c->auth_timeout_event == e); +    connection *c = CONNECTION(userdata); +     +    pa_assert(m); +    pa_assert(tv); +    connection_assert_ref(c); +    pa_assert(c->auth_timeout_event == e);      if (!c->authorized) -        connection_free(c); +        connection_unlink(c);  }  static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) { -    struct connection *c; +    connection *c;      pa_protocol_esound *p = userdata;      char cname[256], pname[128]; -    assert(s && io && p); +     +    pa_assert(s); +    pa_assert(io); +    pa_assert(p);      if (pa_idxset_size(p->connections)+1 > MAX_CONNECTIONS) {          pa_log("Warning! Too many connections (%u), dropping incoming connection.", MAX_CONNECTIONS); @@ -1188,16 +1335,16 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)          return;      } -    c = pa_xnew(struct connection, 1); +    c = pa_msgobject_new(connection); +    c->parent.parent.free = connection_free; +    c->parent.process_msg = connection_process_msg;      c->protocol = p;      c->io = io;      pa_iochannel_set_callback(c->io, io_callback, c);      pa_iochannel_socket_peer_to_string(io, pname, sizeof(pname));      pa_snprintf(cname, sizeof(cname), "EsounD client (%s)", pname); -    assert(p->core);      c->client = pa_client_new(p->core, __FILE__, cname); -    assert(c->client);      c->client->owner = p->module;      c->client->kill = client_kill_cb;      c->client->userdata = c; @@ -1224,6 +1371,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)      c->playback.current_memblock = NULL;      c->playback.memblock_index = 0;      c->playback.fragment_size = 0; +    pa_atomic_store(&c->playback.missing, 0);      c->scache.memchunk.length = c->scache.memchunk.index = 0;      c->scache.memchunk.memblock = NULL; @@ -1245,7 +1393,6 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)          c->auth_timeout_event = NULL;      c->defer_event = p->core->mainloop->defer_new(p->core->mainloop, defer_callback, c); -    assert(c->defer_event);      p->core->mainloop->defer_enable(c->defer_event, 0);      pa_idxset_put(p->connections, c, &c->index); @@ -1254,22 +1401,22 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)  /*** entry points ***/  pa_protocol_esound* pa_protocol_esound_new(pa_core*core, pa_socket_server *server, pa_module *m, pa_modargs *ma) { -    pa_protocol_esound *p; +    pa_protocol_esound *p = NULL;      int public = 0;      const char *acl; -    assert(core); -    assert(server); -    assert(m); -    assert(ma); - -    p = pa_xnew(pa_protocol_esound, 1); +    pa_assert(core); +    pa_assert(server); +    pa_assert(m); +    pa_assert(ma);      if (pa_modargs_get_value_boolean(ma, "auth-anonymous", &public) < 0) {          pa_log("auth-anonymous= expects a boolean argument.");          goto fail;      } +    p = pa_xnew(pa_protocol_esound, 1); +      if (pa_authkey_load_auto(pa_modargs_get_value(ma, "cookie", DEFAULT_COOKIE_FILE), p->esd_key, sizeof(p->esd_key)) < 0)          goto fail; @@ -1282,13 +1429,12 @@ pa_protocol_esound* pa_protocol_esound_new(pa_core*core, pa_socket_server *serve      } else          p->auth_ip_acl = NULL; +    p->core = core;      p->module = m;      p->public = public;      p->server = server;      pa_socket_server_set_callback(p->server, on_connection, p); -    p->core = core;      p->connections = pa_idxset_new(NULL, NULL); -    assert(p->connections);      p->sink_name = pa_xstrdup(pa_modargs_get_value(ma, "sink", NULL));      p->source_name = pa_xstrdup(pa_modargs_get_value(ma, "source", NULL)); @@ -1302,17 +1448,20 @@ fail:  }  void pa_protocol_esound_free(pa_protocol_esound *p) { -    struct connection *c; -    assert(p); +    connection *c; +    pa_assert(p);      while ((c = pa_idxset_first(p->connections, NULL))) -        connection_free(c); - +        connection_unlink(c);      pa_idxset_free(p->connections, NULL, NULL); +      pa_socket_server_unref(p->server);      if (p->auth_ip_acl)          pa_ip_acl_free(p->auth_ip_acl); +    pa_xfree(p->sink_name); +    pa_xfree(p->source_name); +      pa_xfree(p);  } | 
