summaryrefslogtreecommitdiffstats
path: root/src/polypcore/protocol-simple.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/polypcore/protocol-simple.c')
-rw-r--r--src/polypcore/protocol-simple.c66
1 files changed, 52 insertions, 14 deletions
diff --git a/src/polypcore/protocol-simple.c b/src/polypcore/protocol-simple.c
index 4d3f8e1d..fac54239 100644
--- a/src/polypcore/protocol-simple.c
+++ b/src/polypcore/protocol-simple.c
@@ -52,6 +52,8 @@ struct connection {
pa_memblockq *input_memblockq, *output_memblockq;
pa_defer_event *defer_event;
+ int dead;
+
struct {
pa_memblock *current_memblock;
size_t memblock_index, fragment_size;
@@ -130,7 +132,7 @@ static int do_read(struct connection *c) {
}
if ((r = pa_iochannel_read(c->io, (uint8_t*) c->playback.current_memblock->data+c->playback.memblock_index, l)) <= 0) {
- pa_log(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
+ pa_log_debug(__FILE__": read() failed: %s\n", r == 0 ? "EOF" : strerror(errno));
return -1;
}
@@ -142,7 +144,7 @@ static int do_read(struct connection *c) {
c->playback.memblock_index += r;
assert(c->input_memblockq);
- pa_memblockq_push_align(c->input_memblockq, &chunk, 0);
+ pa_memblockq_push_align(c->input_memblockq, &chunk);
assert(c->sink_input);
pa_sink_notify(c->sink_input->sink);
@@ -170,32 +172,46 @@ static int do_write(struct connection *c) {
pa_memblockq_drop(c->output_memblockq, &chunk, r);
pa_memblock_unref(chunk.memblock);
+
+ pa_source_notify(c->source_output->source);
return 0;
}
-
static void do_work(struct connection *c) {
assert(c);
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
c->protocol->core->mainloop->defer_enable(c->defer_event, 0);
- if (pa_iochannel_is_writable(c->io))
- if (do_write(c) < 0)
- goto fail;
+ if (c->dead)
+ return;
- if (pa_iochannel_is_readable(c->io))
+ if (pa_iochannel_is_readable(c->io)) {
if (do_read(c) < 0)
goto fail;
+ } else if (pa_iochannel_is_hungup(c->io))
+ goto fail;
- if (pa_iochannel_is_hungup(c->io))
- c->protocol->core->mainloop->defer_enable(c->defer_event, 1);
+ if (pa_iochannel_is_writable(c->io)) {
+ if (do_write(c) < 0)
+ goto fail;
+ }
return;
fail:
- connection_free(c);
+
+ if (c->sink_input) {
+ c->dead = 1;
+
+ pa_iochannel_free(c->io);
+ c->io = NULL;
+
+ pa_memblockq_prebuf_disable(c->input_memblockq);
+ pa_sink_notify(c->sink_input->sink);
+ } else
+ connection_free(c);
}
/*** sink_input callbacks ***/
@@ -205,8 +221,13 @@ static int sink_input_peek_cb(pa_sink_input *i, pa_memchunk *chunk) {
assert(i && i->userdata && chunk);
c = i->userdata;
- if (pa_memblockq_peek(c->input_memblockq, chunk) < 0)
+ if (pa_memblockq_peek(c->input_memblockq, chunk) < 0) {
+
+ if (c->dead)
+ connection_free(c);
+
return -1;
+ }
return 0;
}
@@ -240,7 +261,7 @@ static void source_output_push_cb(pa_source_output *o, const pa_memchunk *chunk)
struct connection *c = o->userdata;
assert(o && c && chunk);
- pa_memblockq_push(c->output_memblockq, chunk, 0);
+ pa_memblockq_push(c->output_memblockq, chunk);
/* do something */
assert(c->protocol && c->protocol->core && c->protocol->core->mainloop && c->protocol->core->mainloop->defer_enable);
@@ -307,6 +328,7 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
c->playback.current_memblock = NULL;
c->playback.memblock_index = 0;
c->playback.fragment_size = 0;
+ c->dead = 0;
pa_iochannel_socket_peer_to_string(io, cname, sizeof(cname));
c->client = pa_client_new(p->core, __FILE__, cname);
@@ -339,7 +361,15 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
c->sink_input->userdata = c;
l = (size_t) (pa_bytes_per_second(&p->sample_spec)*PLAYBACK_BUFFER_SECONDS);
- c->input_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), l/2, l/PLAYBACK_BUFFER_FRAGMENTS, p->core->memblock_stat);
+ c->input_memblockq = pa_memblockq_new(
+ 0,
+ l,
+ 0,
+ pa_frame_size(&p->sample_spec),
+ (size_t) -1,
+ l/PLAYBACK_BUFFER_FRAGMENTS,
+ NULL,
+ p->core->memblock_stat);
assert(c->input_memblockq);
pa_iochannel_socket_set_rcvbuf(io, l/PLAYBACK_BUFFER_FRAGMENTS*5);
c->playback.fragment_size = l/10;
@@ -368,7 +398,15 @@ static void on_connection(pa_socket_server*s, pa_iochannel *io, void *userdata)
c->source_output->userdata = c;
l = (size_t) (pa_bytes_per_second(&p->sample_spec)*RECORD_BUFFER_SECONDS);
- c->output_memblockq = pa_memblockq_new(l, 0, pa_frame_size(&p->sample_spec), 0, 0, p->core->memblock_stat);
+ c->output_memblockq = pa_memblockq_new(
+ 0,
+ l,
+ 0,
+ pa_frame_size(&p->sample_spec),
+ 1,
+ 0,
+ NULL,
+ p->core->memblock_stat);
pa_iochannel_socket_set_sndbuf(io, l/RECORD_BUFFER_FRAGMENTS*2);
}