diff options
Diffstat (limited to 'src/polypcore/protocol-simple.c')
-rw-r--r-- | src/polypcore/protocol-simple.c | 66 |
1 files changed, 52 insertions, 14 deletions
diff --git a/src/polypcore/protocol-simple.c b/src/polypcore/protocol-simple.c index 4d3f8e1d..fac54239 100644 --- a/src/polypcore/protocol-simple.c +++ b/src/polypcore/protocol-simple.c @@ -52,6 +52,8 @@ struct connection { pa_memblockq *input_memblockq, *output_memblockq; pa_defer_event *defer_event; + int dead; + struct { pa_memblock *current_memblock; size_t memblock_index, fragment_size; @@ -130,7 +132,7 @@ static int do_read(struct connection *c) { } if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) { - pa_log(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno)); + pa_log_debug(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno)); return -1; } @@ -142,7 +144,7 @@ static int do_read(struct connection *c) { c->playback.memblock_index += r; assert(c->input_memblockq); - pa_memblockq_push_align(c->input_memblockq, &chunk, 0); + pa_memblockq_push_align(c->input_memblockq, &chunk); assert(c->sink_input); pa_sink_notify(c->sink_input->sink); @@ -170,32 +172,46 @@ static int do_write(struct connection *c) { pa_memblockq_drop(c->output_memblockq, &chunk, r); pa_memblock_unref(chunk.memblock); + + pa_source_notify(c->source_output->source); return 0; } - static void do_work(struct connection *c) { assert(c); assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); c->protocol->core->mainloop->defer_enable(c->defer_event, 0); - if (pa_iochannel_is_writable(c->io)) - if (do_write(c) < 0) - goto fail; + if (c->dead) + return; - if (pa_iochannel_is_readable(c->io)) + if (pa_iochannel_is_readable(c->io)) { if (do_read(c) < 0) goto fail; + } else if (pa_iochannel_is_hungup(c->io)) + goto fail; - if (pa_iochannel_is_hungup(c->io)) - c->protocol->core->mainloop->defer_enable(c->defer_event, 1); + if (pa_iochannel_is_writable(c->io)) { + if (do_write(c) < 0) + goto fail; + } return; fail: - connection_free(c); + + if (c->sink_input) { + c->dead = 1; + + pa_iochannel_free(c->io); + c->io = NULL; + + pa_memblockq_prebuf_disable(c->input_memblockq); + pa_sink_notify(c->sink_input->sink); + } else + connection_free(c); } /*** sink_input callbacks ***/ @@ -205,8 +221,13 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) { assert(i && i->userdata && chunk); c = i->userdata; - if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) + if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) { + + if (c->dead) + connection_free(c); + return -1; + } return 0; } @@ -240,7 +261,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk) struct connection *c = o->userdata; assert(o && c && chunk); - pa_memblockq_push(c->output_memblockq, chunk, 0); + pa_memblockq_push(c->output_memblockq, chunk); /* do something */ assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable); @@ -307,6 +328,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->playback.current_memblock = NULL; c->playback.memblock_index = 0; c->playback.fragment_size = 0; + c->dead = 0; pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname)); c->client = pa_client_new(p->core, __FILE__, cname); @@ -339,7 +361,15 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->sink_input->userdata = c; l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS); - c->input_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), l/2, l/PLAYBACK_BUFFER_FRAGMENTS, p->core->memblock_stat); + c->input_memblockq = pa_memblockq_new( + 0, + l, + 0, + pa_frame_size(&p->sample_spec), + (size_t) -1, + l/PLAYBACK_BUFFER_FRAGMENTS, + NULL, + p->core->memblock_stat); assert(c->input_memblockq); pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5); c->playback.fragment_size = l/10; @@ -368,7 +398,15 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata) c->source_output->userdata = c; l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS); - c->output_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), 0, 0, p->core->memblock_stat); + c->output_memblockq = pa_memblockq_new( + 0, + l, + 0, + pa_frame_size(&p->sample_spec), + 1, + 0, + NULL, + p->core->memblock_stat); pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2); } |