diff options
Diffstat (limited to 'src/buffio.c')
-rw-r--r-- | src/buffio.c | 436 |
1 files changed, 365 insertions, 71 deletions
diff --git a/src/buffio.c b/src/buffio.c index 25ba5a4..727821d 100644 --- a/src/buffio.c +++ b/src/buffio.c @@ -3,16 +3,21 @@ #include <string.h> #include <errno.h> #include <stdio.h> +#include <sys/socket.h> #include <oop.h> #include <libdaemon/dlog.h> +#include <libdaemon/dnonblock.h> #include "buffio.h" #include "main.h" +#include "timevalarith.h" -#define BUFSIZE 10240 +//#define IODEBUG 1 +#define BUFSIZE PIPE_BUF +static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user); static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user); static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user); @@ -29,6 +34,9 @@ struct buffio* buffio_new(int ifd, int ofd) { b->ifd = ifd; b->ofd = ofd; + + daemon_nonblock(b->ifd, 1); + daemon_nonblock(b->ofd, 1); b->input_buf = malloc(b->input_max_length = BUFSIZE); assert(b->input_buf); @@ -46,36 +54,99 @@ struct buffio* buffio_new(int ifd, int ofd) { return b; } -static void buffio_close_fd(struct buffio *b) { +void buffio_close_input_fd(struct buffio *b) { assert(b); - if (b->b_read_cb) + if (b->b_read_cb) { event_source->cancel_fd(event_source, b->ifd, OOP_READ); + b->b_read_cb = 0; + } - if (b->b_write_cb) + if (b->ifd >= 0) { + + if (b->ofd != b->ifd) + close(b->ifd); + + b->ifd = -1; + } +} + +void buffio_close_output_fd(struct buffio *b) { + assert(b); + + if (b->b_write_cb) { event_source->cancel_fd(event_source, b->ofd, OOP_WRITE); + b->b_write_cb = 0; + } - if (b->ifd >= 0) - close(b->ifd); - if (b->ofd >= 0 && b->ofd != b->ifd) - close(b->ofd); + if (b->ofd >= 0) { - b->b_read_cb = b->b_write_cb = 0; - b->ifd = b->ofd = -1; + if (b->ofd != b->ifd) + close(b->ofd); + + b->ofd = -1; + } } - void buffio_free(struct buffio *b) { - buffio_close_fd(b); + buffio_close_input_fd(b); + buffio_close_output_fd(b); + + if (b->delaying) + event_source->cancel_time(event_source, b->timeout_tv, buffio_timeout_cb, b); + free(b->output_buf); free(b->input_buf); free(b); } +static void* buffio_user_cb(oop_source *source, struct timeval tv, void *user) { + struct buffio *b = user; + enum buffio_sched_cb s; + + int r = 0; + assert(b && source && source == event_source); + + s = b->sched_cb; + b->sched_cb = BUFFIO_SCHED_CB_IDLE; + + if (s & BUFFIO_SCHED_CB_INPUT_READY && b->input_ready_cb && b->input_length) + r |= b->input_ready_cb(b, b->user); + if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && b->output_empty_cb && !b->output_length) + r |= b->output_empty_cb(b, b->user); + if (s & BUFFIO_SCHED_CB_EOF && b->eof_cb) + r |= b->eof_cb(b, b->user); + if (s & BUFFIO_SCHED_CB_EPIPE && b->epipe_cb) + r |= b->epipe_cb(b, b->user); + if (s & BUFFIO_SCHED_CB_ERROR && b->error_cb) + r |= b->error_cb(b, b->user); + + return r == 0 ? OOP_CONTINUE : OOP_HALT; +} + + +static void buffio_sched_cb(struct buffio *b, enum buffio_sched_cb s) { + if (s & BUFFIO_SCHED_CB_INPUT_READY && !b->input_ready_cb) s &= ~BUFFIO_SCHED_CB_INPUT_READY; + if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && !b->output_empty_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_EMPTY; + if (s & BUFFIO_SCHED_CB_EOF && !b->eof_cb) s &= ~BUFFIO_SCHED_CB_EOF; + if (s & BUFFIO_SCHED_CB_EPIPE && !b->epipe_cb) s = ~BUFFIO_SCHED_CB_EPIPE; + if (s & BUFFIO_SCHED_CB_ERROR && !b->error_cb) s = ~BUFFIO_SCHED_CB_ERROR; + + if (s == BUFFIO_SCHED_CB_IDLE) + return; + + if (!b->sched_cb) { + assert(event_source); + event_source->on_time(event_source, OOP_TIME_NOW, buffio_user_cb, b); + } + + b->sched_cb |= s; +} + #ifdef IODEBUG -static void esc_print(const char*c, const uint8_t *b, size_t l) { +static void esc_print(struct buffio *bio, const char*c, const uint8_t *b, size_t l) { size_t i; - fprintf(stderr, "%s", c); + fprintf(stderr, "[%p] %s", bio, c); for (i = 0; i < l; i++, b++) fputc(*b < 32 ? '.' : *b, stderr); @@ -83,7 +154,7 @@ static void esc_print(const char*c, const uint8_t *b, size_t l) { }; #endif -static void buffio_normalize(struct buffio *b) { +inline static void buffio_normalize(struct buffio *b) { assert(b); assert(b->input_index < b->input_max_length); @@ -92,14 +163,20 @@ static void buffio_normalize(struct buffio *b) { if (!b->input_length) /* This will optimize throughput a bit */ b->input_index = 0; + assert(b->output_index < b->output_max_length); + assert(b->output_length <= b->output_max_length); + if (!b->output_length) /* This will optimize throughput a bit */ b->output_index = 0; } static void buffio_set_input_callbacks(struct buffio *b) { - assert(b && b->ifd >= 0 && event_source); + assert(b && event_source); - if (b->input_length <= b->input_watermark) { + if (b->ifd == -1) + return; + + if (!b->readable && b->input_length < b->input_watermark) { if (!b->b_read_cb) { /* Enable the callback */ event_source->on_fd(event_source, b->ifd, OOP_READ, buffio_read_cb, b); b->b_read_cb = 1; @@ -113,9 +190,12 @@ static void buffio_set_input_callbacks(struct buffio *b) { } static void buffio_set_output_callbacks(struct buffio *b) { - assert(b && b->ofd >= 0 && event_source); + assert(b && event_source); - if (b->output_length >= b->output_watermark) { + if (b->ofd == -1) + return; + + if (!b->writable && b->output_length >= b->output_watermark) { if (!b->b_write_cb) { /* Enable the callback */ event_source->on_fd(event_source, b->ofd, OOP_WRITE, buffio_write_cb, b); b->b_write_cb = 1; @@ -128,11 +208,44 @@ static void buffio_set_output_callbacks(struct buffio *b) { } } -static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user) { - struct buffio *b = user; - assert(source && b && b->ifd == fd && event == OOP_READ); +static void buffio_delay(struct buffio *b, unsigned bytes) { + assert(event_source && event_source->on_time); + + if (b->delaying) { + event_source->cancel_time(event_source, b->timeout_tv, buffio_timeout_cb, b); + b->delaying = 0; + } + + if (bytes && b->output_delay_usec) { + struct timeval now; + + gettimeofday(&now, NULL); + + /* Time when the last write operation is complete */ + b->finish_tv = timeval_add(timeval_max(b->finish_tv, now), (uint64_t) bytes * b->output_delay_usec); + + /* Time when we want to write the next data to the device */ + b->timeout_tv = timeval_sub(b->finish_tv, b->output_latency_usec); + + assert(event_source && event_source->on_time); + event_source->on_time(event_source, b->timeout_tv, buffio_timeout_cb, b); + b->delaying = 1; + +// daemon_log(LOG_INFO, "%u bytes, now = %lu|%lu; finish = %lu|%lu; timeout = %lu|%lu", bytes, now.tv_sec, now.tv_usec, b->finish_tv.tv_sec, b->finish_tv.tv_usec, b->timeout_tv.tv_sec, b->timeout_tv.tv_usec); + } +} - if (b->input_length < b->input_max_length) { +void buffio_set_delay_usec(struct buffio *b, unsigned long delay, unsigned long latency) { + assert(b); + + b->output_delay_usec = delay; + b->output_latency_usec = latency; +} + +static void do_read(struct buffio *b) { + assert(b); + + if (b->readable && b->input_length < b->input_watermark) { ssize_t s; size_t m, i; @@ -141,41 +254,47 @@ static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *u m = b->input_max_length-b->input_length; if (m > b->input_max_length-i) m = b->input_max_length-i; - - if ((s = read(fd, b->input_buf+i, m)) < 0) { + + s = read(b->ifd, b->input_buf+i, m); + b->readable = 0; + + //daemon_log(LOG_INFO, "%p: Read %u (%u) bytes.", b, s, m); + + if (s < 0) { daemon_log(LOG_ERR, "Failed to read from file descriptor: %s", strerror(errno)); - return OOP_HALT; - } + buffio_close_input_fd(b); + buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); + return; + + } else if (!s) { + buffio_close_input_fd(b); + buffio_sched_cb(b, BUFFIO_SCHED_CB_EOF); + return; + + } else { #ifdef IODEBUG - esc_print("INPUT: ", b->input_buf+i, s); + esc_print(b, "INPUT: ", b->input_buf+i, s); #endif - - if (!s) { - buffio_close_fd(b); - if (b->eof_cb) - b->eof_cb(b, b->user); - return OOP_CONTINUE; + + assert(s <= m); + b->input_length += s; + buffio_normalize(b); } - - assert(s <= m); - b->input_length += s; } - buffio_normalize(b); buffio_set_input_callbacks(b); - if (b->input_length && b->input_ready_cb) - b->input_ready_cb(b, b->user); + if (b->input_length) + buffio_sched_cb(b, BUFFIO_SCHED_CB_INPUT_READY); - return OOP_CONTINUE; + return; } -static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user) { - struct buffio *b = user; - assert(source && b && b->ofd == fd && event == OOP_WRITE); +static void do_write(struct buffio *b) { + assert(b); - if (b->output_length > 0) { + if (!b->delaying && b->writable && b->output_length >= b->output_watermark) { ssize_t s; size_t m; @@ -183,37 +302,73 @@ static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void * if (m > b->output_max_length-b->output_index) m = b->output_max_length-b->output_index; - if ((s = write(fd, b->output_buf+b->output_index, m)) < 0) { - daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno)); - return OOP_HALT; - } + s = write(b->ofd, b->output_buf+b->output_index, m); + b->writable = 0; -#ifdef IODEBUG - esc_print("OUTPUT: ", b->output_buf+b->output_index, s); -#endif + //daemon_log(LOG_INFO, "%p: Wrote %u (%u) bytes.", b, s, m); - if (!s) { - buffio_close_fd(b); - if (b->eof_cb) - b->eof_cb(b, b->user); - return OOP_CONTINUE; + if (s < 0) { + buffio_close_output_fd(b); + + if (errno == EPIPE) { + buffio_sched_cb(b, BUFFIO_SCHED_CB_EPIPE); + return; + + } else { + daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno)); + buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); + return; + } } + +#ifdef IODEBUG + esc_print(b, "OUTPUT: ", b->output_buf+b->output_index, s); +#endif - assert(s <= m); + assert(s > 0 && s <= m); b->output_index = (b->output_index + s) % b->output_max_length; b->output_length -= s; - } - buffio_normalize(b); - buffio_set_output_callbacks(b); + buffio_normalize(b); + buffio_delay(b, s); - if (!b->output_length && b->output_empty_cb) - b->output_empty_cb(b, b->user); + if (!b->output_length) + buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_EMPTY); + } else + buffio_set_output_callbacks(b); + + return; +} + +static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user) { + struct buffio *b = user; + assert(source && b && b->ifd == fd && event == OOP_READ); + + b->readable = 1; + do_read(b); return OOP_CONTINUE; +} +static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user) { + struct buffio *b = user; + assert(source && b && b->ofd == fd && event == OOP_WRITE); + + b->writable = 1; + do_write(b); + + return OOP_CONTINUE; } +static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user) { + struct buffio *b = user; + assert(source && b && b->delaying); + + b->delaying = 0; + do_write(b); + + return OOP_CONTINUE; +} int buffio_write(struct buffio *b, const uint8_t *d, size_t l) { assert(b && d && l); @@ -229,9 +384,6 @@ int buffio_write(struct buffio *b, const uint8_t *d, size_t l) { i = (b->output_index + b->output_length) % b->output_max_length; m = l; - if (m > b->output_max_length-b->output_length) - m = b->output_max_length-b->output_length; - if (m > b->output_max_length-i) m = b->output_max_length-i; @@ -241,9 +393,9 @@ int buffio_write(struct buffio *b, const uint8_t *d, size_t l) { b->output_length += m; } - buffio_set_output_callbacks(b); buffio_normalize(b); - + + do_write(b); return 0; } @@ -266,6 +418,13 @@ void buffio_flush_input(struct buffio *b) { buffio_set_input_callbacks(b); } +void buffio_flush_output(struct buffio *b) { + assert(b); + + b->output_length = b->output_index = 0; + buffio_set_output_callbacks(b); +} + int buffio_find_input(struct buffio *b, const char *c) { size_t l, cl, i; @@ -284,7 +443,8 @@ int buffio_find_input(struct buffio *b, const char *c) { b->input_length = l-cl; buffio_normalize(b); - buffio_set_input_callbacks(b); + + do_read(b); return 1; } } @@ -293,7 +453,7 @@ int buffio_find_input(struct buffio *b, const char *c) { } -char *buffio_read_line(struct buffio *b, char *c, size_t l) { +char* buffio_read_line(struct buffio *b, char *c, size_t l) { size_t i; assert(b && c && l); @@ -312,8 +472,10 @@ char *buffio_read_line(struct buffio *b, char *c, size_t l) { b->input_index = (b->input_index+j+1) % b->input_max_length; b->input_length -= j+1; - buffio_set_input_callbacks(b); + buffio_normalize(b); + + do_read(b); return c; } } @@ -341,3 +503,135 @@ void buffio_dump(struct buffio *b) { } fprintf(stderr, "]\n"); } + +void buffio_set_input_watermark(struct buffio *b, ssize_t w) { + assert(b); + + if (w < 0) + b->input_watermark = b->input_max_length; + else + b->input_watermark = w; + + assert(b->input_watermark > 0 && b->input_watermark <= b->input_max_length); + buffio_set_input_callbacks(b); +} + +void buffio_set_output_watermark(struct buffio *b, ssize_t w) { + assert(b); + + if (w < 0) + b->output_watermark = b->output_max_length; + else + b->output_watermark = w; + + assert(b->output_watermark > 0 && b->output_watermark <= b->output_max_length); + buffio_set_output_callbacks(b); +} + + +const uint8_t* buffio_read_ptr(struct buffio *b, size_t *l) { + assert(b && l); + + *l = b->input_length; + + if (*l > b->input_max_length - b->input_index) + *l = b->input_max_length - b->input_index; + + return b->input_buf + b->input_index; +} + +void buffio_read_ptr_inc(struct buffio *b, size_t l) { + assert(b); + + if (!l) + return; + + assert(l <= b->input_length && l <= b->input_max_length - b->input_index); + b->input_index += l; + + while (b->input_index >= b->input_max_length) + b->input_index -= b->input_max_length; + + b->input_length -= l; + + buffio_normalize(b); + + do_read(b); +} + +uint8_t* buffio_write_ptr(struct buffio *b, size_t *l) { + size_t j; + assert(b && l); + + j = (b->output_index + b->output_length) % b->output_max_length; + + *l = b->output_max_length - b->output_length; + if (*l > b->output_max_length - j) + *l = b->output_max_length - j; + + return b->output_buf + j; +} + +void buffio_write_ptr_inc(struct buffio *b, size_t l) { + int j; + assert(b); + + if (!l) + return; + + j = (b->output_index + b->output_length) % b->output_max_length; + assert(l <= b->output_max_length - b->output_length && l <= b->output_max_length - j); + + b->output_length += l; + + buffio_normalize(b); + + do_write(b); +} + +int buffio_output_is_empty(struct buffio *b) { + assert(b); + + return b->output_length == 0; +} + +void buffio_dump_lines(struct buffio *b) { + int r, i; + assert(b); + + r = 0; + + for (i = 0; i < b->input_length; i++) + if (b->input_buf[(b->input_index+i) % b->input_max_length] == '\n') + r = i+1; + + if (r > 0) { + b->input_index = (b->input_index+r) % b->input_max_length; + b->input_length -= r; + + buffio_normalize(b); + + do_read(b); + } +} + +void buffio_set_fds(struct buffio *b, int ifd, int ofd) { + assert(b && b->ifd == -1 && b->ofd == -1); + + b->ifd = ifd; + b->ofd = ofd; + + daemon_nonblock(b->ifd, 1); + daemon_nonblock(b->ofd, 1); + + b->readable = b->writable = 0; + + buffio_set_input_callbacks(b); + buffio_set_output_callbacks(b); +} + +int buffio_can_write(struct buffio *b, size_t l) { + assert(b); + + return l <= b->output_max_length - b->output_length; +} |