diff options
Diffstat (limited to 'src/protocol-simple.c')
-rw-r--r-- | src/protocol-simple.c | 95 |
1 files changed, 58 insertions, 37 deletions
diff --git a/src/protocol-simple.c b/src/protocol-simple.c index ec121faa..1c462b39 100644 --- a/src/protocol-simple.c +++ b/src/protocol-simple.c @@ -66,53 +66,73 @@ static void client_kill_cb(struct client *client, void*userdata) { destroy_connection(c); } -static void io_callback(struct iochannel*io, void *userdata) { - struct connection *c = userdata; - assert(io && c); - - if (c->istream && iochannel_is_readable(io)) { - struct memchunk chunk; - ssize_t r; - - chunk.memblock = memblock_new(BUFSIZE); - assert(chunk.memblock); +static int do_read(struct connection *c) { + struct memchunk chunk; + ssize_t r; - if ((r = iochannel_read(io, chunk.memblock->data, BUFSIZE)) <= 0) { - fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno)); - memblock_unref(chunk.memblock); - goto fail; - } - - chunk.memblock->length = r; - chunk.length = r; - chunk.index = 0; - - memblockq_push(c->istream->memblockq, &chunk, 0); - input_stream_notify_sink(c->istream); + if (!iochannel_is_readable(c->io)) + return 0; + + if (!c->istream || !memblockq_is_writable(c->istream->memblockq, BUFSIZE)) + return 0; + + chunk.memblock = memblock_new(BUFSIZE); + assert(chunk.memblock); + + if ((r = iochannel_read(c->io, chunk.memblock->data, BUFSIZE)) <= 0) { + fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno)); memblock_unref(chunk.memblock); + return -1; } + + chunk.memblock->length = r; + chunk.length = r; + chunk.index = 0; + + memblockq_push(c->istream->memblockq, &chunk, 0); + input_stream_notify_sink(c->istream); + memblock_unref(chunk.memblock); + return 0; +} - if (c->ostream && iochannel_is_writable(io)) { - struct memchunk chunk; - ssize_t r; +static int do_write(struct connection *c) { + struct memchunk chunk; + ssize_t r; - memblockq_peek(c->ostream->memblockq, &chunk); - assert(chunk.memblock && chunk.length); + if (!iochannel_is_writable(c->io)) + return 0; + + if (!c->ostream) + return 0; - if ((r = iochannel_write(io, chunk.memblock->data+chunk.index, chunk.length)) < 0) { - fprintf(stderr, "write(): %s\n", strerror(errno)); - memblock_unref(chunk.memblock); - goto fail; - } - - memblockq_drop(c->ostream->memblockq, r); + memblockq_peek(c->ostream->memblockq, &chunk); + assert(chunk.memblock && chunk.length); + + if ((r = iochannel_write(c->io, chunk.memblock->data+chunk.index, chunk.length)) < 0) { + fprintf(stderr, "write(): %s\n", strerror(errno)); memblock_unref(chunk.memblock); + return -1; } + + memblockq_drop(c->ostream->memblockq, r); + memblock_unref(chunk.memblock); + return 0; +} - return; +static void io_callback(struct iochannel*io, void *userdata) { + struct connection *c = userdata; + assert(io && c && c->io == io); + + if (do_read(c) < 0 || do_write(c) < 0) + destroy_connection(c); +} + +static void istream_notify_cb(struct input_stream *i, void *userdata) { + struct connection*c = userdata; + assert(i && c && c->istream == i); -fail: - destroy_connection(c); + if (do_read(c) < 0) + destroy_connection(c); } static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) { @@ -155,6 +175,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name); assert(c->istream); input_stream_set_kill_callback(c->istream, istream_kill_cb, c); + input_stream_set_notify_callback(c->istream, istream_notify_cb, c); } |