From d24a3f265ec4344b5502ec57df3cf8358f6f1499 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Mon, 5 Jan 2004 22:24:10 +0000 Subject: many changes git-svn-id: file:///home/lennart/svn/public/ivam2/trunk@14 dbf6933d-3bce-0310-9bcc-ed052ba35b35 --- src/buffio.c | 276 +++++++++++++++++++++++++++++++---------------------------- 1 file changed, 146 insertions(+), 130 deletions(-) (limited to 'src/buffio.c') diff --git a/src/buffio.c b/src/buffio.c index 727821d..95db681 100644 --- a/src/buffio.c +++ b/src/buffio.c @@ -21,8 +21,8 @@ static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user); static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user); -static void buffio_set_input_callbacks(struct buffio *b); -static void buffio_set_output_callbacks(struct buffio *b); +static void buffio_set_writable(struct buffio *b, int v); +static void buffio_set_readable(struct buffio *b, int v); struct buffio* buffio_new(int ifd, int ofd) { struct buffio *b; @@ -32,24 +32,21 @@ struct buffio* buffio_new(int ifd, int ofd) { assert(b); memset(b, 0, sizeof(struct buffio)); - b->ifd = ifd; - b->ofd = ofd; - - daemon_nonblock(b->ifd, 1); - daemon_nonblock(b->ofd, 1); + daemon_nonblock(b->ifd = ifd, 1); + daemon_nonblock(b->ofd = ofd, 1); b->input_buf = malloc(b->input_max_length = BUFSIZE); assert(b->input_buf); b->input_length = b->input_index = 0; - b->input_watermark = b->input_max_length; /* Read some more data if possible */ + b->input_range = b->input_max_length; b->output_buf = malloc(b->output_max_length = BUFSIZE); assert(b->output_buf); b->output_length = b->output_index = 0; - b->output_watermark = 1; /* Write some data if 1 or more bytes are in the output buffer */ + b->output_range = b->output_max_length; - buffio_set_input_callbacks(b); - buffio_set_output_callbacks(b); + buffio_set_readable(b, 0); + buffio_set_writable(b, 0); return b; } @@ -114,6 +111,8 @@ static void* buffio_user_cb(oop_source *source, struct timeval tv, void *user) { r |= b->input_ready_cb(b, b->user); if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && b->output_empty_cb && !b->output_length) r |= b->output_empty_cb(b, b->user); + if (s & BUFFIO_SCHED_CB_OUTPUT_REQUEST && b->output_request_cb && b->output_length < b->output_range) + r |= b->output_request_cb(b, b->user); if (s & BUFFIO_SCHED_CB_EOF && b->eof_cb) r |= b->eof_cb(b, b->user); if (s & BUFFIO_SCHED_CB_EPIPE && b->epipe_cb) @@ -124,9 +123,9 @@ static void* buffio_user_cb(oop_source *source, struct timeval tv, void *user) { return r == 0 ? OOP_CONTINUE : OOP_HALT; } - static void buffio_sched_cb(struct buffio *b, enum buffio_sched_cb s) { if (s & BUFFIO_SCHED_CB_INPUT_READY && !b->input_ready_cb) s &= ~BUFFIO_SCHED_CB_INPUT_READY; + if (s & BUFFIO_SCHED_CB_OUTPUT_REQUEST && !b->output_request_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_REQUEST; if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && !b->output_empty_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_EMPTY; if (s & BUFFIO_SCHED_CB_EOF && !b->eof_cb) s &= ~BUFFIO_SCHED_CB_EOF; if (s & BUFFIO_SCHED_CB_EPIPE && !b->epipe_cb) s = ~BUFFIO_SCHED_CB_EPIPE; @@ -170,13 +169,13 @@ inline static void buffio_normalize(struct buffio *b) { b->output_index = 0; } -static void buffio_set_input_callbacks(struct buffio *b) { +static void buffio_set_readable(struct buffio *b, int v) { assert(b && event_source); if (b->ifd == -1) return; - if (!b->readable && b->input_length < b->input_watermark) { + if (!(b->readable = v)) { if (!b->b_read_cb) { /* Enable the callback */ event_source->on_fd(event_source, b->ifd, OOP_READ, buffio_read_cb, b); b->b_read_cb = 1; @@ -189,13 +188,13 @@ static void buffio_set_input_callbacks(struct buffio *b) { } } -static void buffio_set_output_callbacks(struct buffio *b) { +static void buffio_set_writable(struct buffio *b, int v) { assert(b && event_source); if (b->ofd == -1) return; - if (!b->writable && b->output_length >= b->output_watermark) { + if (!(b->writable = v)) { if (!b->b_write_cb) { /* Enable the callback */ event_source->on_fd(event_source, b->ofd, OOP_WRITE, buffio_write_cb, b); b->b_write_cb = 1; @@ -243,108 +242,109 @@ void buffio_set_delay_usec(struct buffio *b, unsigned long delay, unsigned long } static void do_read(struct buffio *b) { + ssize_t s; + size_t m, i; + assert(b); - if (b->readable && b->input_length < b->input_watermark) { - ssize_t s; - size_t m, i; - - i = (b->input_index + b->input_length) % b->input_max_length; - - m = b->input_max_length-b->input_length; - if (m > b->input_max_length-i) - m = b->input_max_length-i; - - s = read(b->ifd, b->input_buf+i, m); - b->readable = 0; - - //daemon_log(LOG_INFO, "%p: Read %u (%u) bytes.", b, s, m); - - if (s < 0) { - daemon_log(LOG_ERR, "Failed to read from file descriptor: %s", strerror(errno)); - buffio_close_input_fd(b); - buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); - return; - - } else if (!s) { - buffio_close_input_fd(b); - buffio_sched_cb(b, BUFFIO_SCHED_CB_EOF); - return; - - } else { + if (!b->readable || b->input_length >= b->input_range || b->ifd == -1) + return; + + i = (b->input_index + b->input_length) % b->input_max_length; + m = b->input_range-b->input_length; + if (m > b->input_max_length-i) + m = b->input_max_length-i; + + s = read(b->ifd, b->input_buf+i, m); + //daemon_log(LOG_INFO, "%p: Read %u (%u) bytes.", b, s, m); + + if (s < 0) { + daemon_log(LOG_ERR, "Failed to read from file descriptor: %s", strerror(errno)); + buffio_close_input_fd(b); + buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); + return; + + } else if (!s) { + buffio_close_input_fd(b); + buffio_sched_cb(b, BUFFIO_SCHED_CB_EOF); + return; + + } + #ifdef IODEBUG - esc_print(b, "INPUT: ", b->input_buf+i, s); + esc_print(b, "INPUT: ", b->input_buf+i, s); #endif - - assert(s <= m); - b->input_length += s; - buffio_normalize(b); - } - } - buffio_set_input_callbacks(b); + assert(s <= m); + b->input_length += s; + buffio_normalize(b); + + buffio_set_readable(b, 0); - if (b->input_length) + if (b->input_length) buffio_sched_cb(b, BUFFIO_SCHED_CB_INPUT_READY); - - return; } static void do_write(struct buffio *b) { - assert(b); + ssize_t s; + size_t m; - if (!b->delaying && b->writable && b->output_length >= b->output_watermark) { - ssize_t s; - size_t m; + assert(b); - m = b->output_length; - if (m > b->output_max_length-b->output_index) - m = b->output_max_length-b->output_index; + if (b->delaying || !b->writable || !b->output_length || b->ofd == -1) + return; - s = write(b->ofd, b->output_buf+b->output_index, m); - b->writable = 0; + if (b->prebuf && b->output_length >= b->output_range) + b->prebuf = 0; - //daemon_log(LOG_INFO, "%p: Wrote %u (%u) bytes.", b, s, m); + if (b->prebuf) + return; + + m = b->output_length; + if (m > b->output_max_length-b->output_index) + m = b->output_max_length-b->output_index; + + s = write(b->ofd, b->output_buf+b->output_index, m); + //daemon_log(LOG_INFO, "%p: Wrote %u (%u) bytes.", b, s, m); + + if (s < 0) { + buffio_close_output_fd(b); - if (s < 0) { - buffio_close_output_fd(b); + if (errno == EPIPE) { + buffio_sched_cb(b, BUFFIO_SCHED_CB_EPIPE); + return; - if (errno == EPIPE) { - buffio_sched_cb(b, BUFFIO_SCHED_CB_EPIPE); - return; - - } else { - daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno)); - buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); - return; - } + } else { + daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno)); + buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); + return; } - + } + #ifdef IODEBUG - esc_print(b, "OUTPUT: ", b->output_buf+b->output_index, s); + esc_print(b, "OUTPUT: ", b->output_buf+b->output_index, s); #endif + + assert(s > 0 && s <= m); + b->output_index = (b->output_index + s) % b->output_max_length; + b->output_length -= s; + buffio_normalize(b); + + buffio_set_writable(b, 0); + buffio_delay(b, s); - assert(s > 0 && s <= m); - b->output_index = (b->output_index + s) % b->output_max_length; - b->output_length -= s; - - buffio_normalize(b); - buffio_delay(b, s); - - if (!b->output_length) - buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_EMPTY); - } else - buffio_set_output_callbacks(b); - - return; + if (b->output_length < b->output_range) + buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_REQUEST); + if (!b->output_length) + buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_EMPTY); } static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user) { struct buffio *b = user; assert(source && b && b->ifd == fd && event == OOP_READ); - b->readable = 1; + buffio_set_readable(b, 1); do_read(b); return OOP_CONTINUE; @@ -354,7 +354,7 @@ static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void * struct buffio *b = user; assert(source && b && b->ofd == fd && event == OOP_WRITE); - b->writable = 1; + buffio_set_writable(b, 1); do_write(b); return OOP_CONTINUE; @@ -370,12 +370,12 @@ static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user return OOP_CONTINUE; } -int buffio_write(struct buffio *b, const uint8_t *d, size_t l) { +void buffio_write(struct buffio *b, const uint8_t *d, size_t l) { assert(b && d && l); - if (l > b->output_max_length - b->output_length) { + if (b->output_length > b->output_range || l > b->output_range - b->output_length) { daemon_log(LOG_ERR, "buffio_write() with too much data called"); - return -1; + return; } while (l > 0) { @@ -394,38 +394,34 @@ int buffio_write(struct buffio *b, const uint8_t *d, size_t l) { } buffio_normalize(b); - do_write(b); - return 0; } -int buffio_print(struct buffio *b, const char *s) { +void buffio_print(struct buffio *b, const char *s) { assert(b && s); - return buffio_write(b, (uint8_t*) s, strlen(s)); + buffio_write(b, (uint8_t*) s, strlen(s)); } -int buffio_command(struct buffio *b, const char *c) { +void buffio_command(struct buffio *b, const char *c) { assert(b && c); buffio_flush_input(b); - return buffio_print(b, c); + buffio_print(b, c); } void buffio_flush_input(struct buffio *b) { assert(b); b->input_length = b->input_index = 0; - buffio_set_input_callbacks(b); + do_read(b); } void buffio_flush_output(struct buffio *b) { assert(b); b->output_length = b->output_index = 0; - buffio_set_output_callbacks(b); } - int buffio_find_input(struct buffio *b, const char *c) { size_t l, cl, i; assert(b && c && *c); @@ -441,7 +437,6 @@ int buffio_find_input(struct buffio *b, const char *c) { if (!*p) { /* Found! */ b->input_index = j; b->input_length = l-cl; - buffio_normalize(b); do_read(b); @@ -472,7 +467,6 @@ char* buffio_read_line(struct buffio *b, char *c, size_t l) { b->input_index = (b->input_index+j+1) % b->input_max_length; b->input_length -= j+1; - buffio_normalize(b); do_read(b); @@ -504,34 +498,39 @@ void buffio_dump(struct buffio *b) { fprintf(stderr, "]\n"); } -void buffio_set_input_watermark(struct buffio *b, ssize_t w) { +void buffio_set_input_range(struct buffio *b, ssize_t w) { assert(b); if (w < 0) - b->input_watermark = b->input_max_length; + b->input_range = b->input_max_length; else - b->input_watermark = w; + b->input_range = w; + + assert(b->input_range > 0 && b->input_range <= b->input_max_length); - assert(b->input_watermark > 0 && b->input_watermark <= b->input_max_length); - buffio_set_input_callbacks(b); + do_read(b); } -void buffio_set_output_watermark(struct buffio *b, ssize_t w) { +void buffio_set_output_range(struct buffio *b, ssize_t w) { assert(b); if (w < 0) - b->output_watermark = b->output_max_length; + b->output_range = b->output_max_length; else - b->output_watermark = w; + b->output_range = w; - assert(b->output_watermark > 0 && b->output_watermark <= b->output_max_length); - buffio_set_output_callbacks(b); + assert(b->output_range > 0 && b->output_range <= b->output_max_length); + + do_write(b); } const uint8_t* buffio_read_ptr(struct buffio *b, size_t *l) { assert(b && l); + if (!b->input_length) + return NULL; + *l = b->input_length; if (*l > b->input_max_length - b->input_index) @@ -553,7 +552,6 @@ void buffio_read_ptr_inc(struct buffio *b, size_t l) { b->input_index -= b->input_max_length; b->input_length -= l; - buffio_normalize(b); do_read(b); @@ -563,9 +561,12 @@ uint8_t* buffio_write_ptr(struct buffio *b, size_t *l) { size_t j; assert(b && l); + if (b->output_length >= b->output_range) + return NULL; + j = (b->output_index + b->output_length) % b->output_max_length; - *l = b->output_max_length - b->output_length; + *l = b->output_range - b->output_length; if (*l > b->output_max_length - j) *l = b->output_max_length - j; @@ -583,7 +584,6 @@ void buffio_write_ptr_inc(struct buffio *b, size_t l) { assert(l <= b->output_max_length - b->output_length && l <= b->output_max_length - j); b->output_length += l; - buffio_normalize(b); do_write(b); @@ -595,6 +595,12 @@ int buffio_output_is_empty(struct buffio *b) { return b->output_length == 0; } +int buffio_input_is_full(struct buffio *b) { + assert(b); + + return b->input_length >= b->input_range; +} + void buffio_dump_lines(struct buffio *b) { int r, i; assert(b); @@ -608,9 +614,8 @@ void buffio_dump_lines(struct buffio *b) { if (r > 0) { b->input_index = (b->input_index+r) % b->input_max_length; b->input_length -= r; - buffio_normalize(b); - + do_read(b); } } @@ -618,20 +623,31 @@ void buffio_dump_lines(struct buffio *b) { void buffio_set_fds(struct buffio *b, int ifd, int ofd) { assert(b && b->ifd == -1 && b->ofd == -1); - b->ifd = ifd; - b->ofd = ofd; + daemon_nonblock(b->ifd = ifd, 1); + daemon_nonblock(b->ofd = ofd, 1); - daemon_nonblock(b->ifd, 1); - daemon_nonblock(b->ofd, 1); + buffio_set_readable(b, 0); + buffio_set_writable(b, 0); +} - b->readable = b->writable = 0; +int buffio_can_write(struct buffio *b, size_t l) { + assert(b); + + if (b->output_length >= b->output_range) + return 0; - buffio_set_input_callbacks(b); - buffio_set_output_callbacks(b); + return l <= b->output_range - b->output_length; } -int buffio_can_write(struct buffio *b, size_t l) { +int buffio_write_req(struct buffio *b) { assert(b); - return l <= b->output_max_length - b->output_length; + return b->output_length < b->output_range; +} + +void buffio_set_prebuf(struct buffio *b, int p) { + assert(b); + + b->prebuf = p; + do_write(b); } -- cgit