summaryrefslogtreecommitdiffstats
path: root/src/protocol-simple.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol-simple.c')
-rw-r--r--src/protocol-simple.c21
1 files changed, 16 insertions, 5 deletions
diff --git a/src/protocol-simple.c b/src/protocol-simple.c
index 1803936e..f779a56a 100644
--- a/src/protocol-simple.c
+++ b/src/protocol-simple.c
@@ -56,6 +56,7 @@ static void destroy_connection(struct connection *c) {
static int do_read(struct connection *c) {
struct memchunk chunk;
ssize_t r;
+ uint32_t u1, u2;
if (!iochannel_is_readable(c->io))
return 0;
@@ -66,8 +67,6 @@ static int do_read(struct connection *c) {
chunk.memblock = memblock_new(BUFSIZE);
assert(chunk.memblock);
- memblock_stamp(chunk.memblock);
-
if ((r = iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) {
fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno));
memblock_unref(chunk.memblock);
@@ -82,6 +81,13 @@ static int do_read(struct connection *c) {
memblockq_push(c->input_memblockq, &chunk, 0);
memblock_unref(chunk.memblock);
sink_notify(c->sink_input->sink);
+
+
+ u1 = memblockq_get_latency(c->input_memblockq);
+ u2 = sink_get_latency(c->sink_input->sink);
+
+ fprintf(stderr, "latency: %u+%u=%u\r", u1, u2, u1+u2);
+
return 0;
}
@@ -96,7 +102,9 @@ static int do_write(struct connection *c) {
return 0;
assert(c->output_memblockq);
- memblockq_peek(c->output_memblockq, &chunk);
+ if (memblockq_peek(c->output_memblockq, &chunk) < 0)
+ return 0;
+
assert(chunk.memblock && chunk.length);
if ((r = iochannel_write(c->io, chunk.memblock->data+chunk.index, chunk.length)) < 0) {
@@ -145,6 +153,9 @@ static void source_output_push_cb(struct source_output *o, struct memchunk *chun
assert(o && c && chunk);
memblockq_push(c->output_memblockq, chunk, 0);
+
+ if (do_write(c) < 0)
+ destroy_connection(c);
}
static void source_output_kill_cb(struct source_output *o) {
@@ -205,7 +216,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->source_output->kill = source_output_kill_cb;
c->source_output->userdata = c;
- l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC);
+ l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); /* 5s */
c->output_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
}
@@ -225,7 +236,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->sink_input->kill = sink_input_kill_cb;
c->sink_input->userdata = c;
- l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC);
+ l = bytes_per_second(&DEFAULT_SAMPLE_SPEC)/2; /* half a second */
c->input_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2);
}