/* $Id$ */ /*** This file is part of ivam2. ivam2 is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. ivam2 is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ivam2; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ***/ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #include #include #include #include "buffio.h" #include "main.h" #include "timevalarith.h" //#define IODEBUG 1 #define BUFSIZE PIPE_BUF static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user); static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user); static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user); static void buffio_set_writable(struct buffio *b, int v); static void buffio_set_readable(struct buffio *b, int v); struct buffio* buffio_new(int ifd, int ofd) { struct buffio *b; assert(b >= 0); b = malloc(sizeof(struct buffio)); assert(b); memset(b, 0, sizeof(struct buffio)); daemon_nonblock(b->ifd = ifd, 1); daemon_nonblock(b->ofd = ofd, 1); b->input_buf = malloc(b->input_max_length = BUFSIZE); assert(b->input_buf); b->input_length = b->input_index = 0; b->input_range = b->input_max_length; b->output_buf = malloc(b->output_max_length = BUFSIZE); assert(b->output_buf); b->output_length = b->output_index = 0; b->output_range = b->output_max_length; buffio_set_readable(b, 0); buffio_set_writable(b, 0); return b; } void buffio_close_input_fd(struct buffio *b) { assert(b); if (b->b_read_cb) { event_source->cancel_fd(event_source, b->ifd, OOP_READ); b->b_read_cb = 0; } if (b->ifd >= 0) { if (b->ofd != b->ifd) close(b->ifd); b->ifd = -1; } } void buffio_close_output_fd(struct buffio *b) { assert(b); if (b->b_write_cb) { event_source->cancel_fd(event_source, b->ofd, OOP_WRITE); b->b_write_cb = 0; } if (b->ofd >= 0) { if (b->ofd != b->ifd) close(b->ofd); b->ofd = -1; } } void buffio_free(struct buffio *b) { buffio_close_input_fd(b); buffio_close_output_fd(b); if (b->delaying) event_source->cancel_time(event_source, b->timeout_tv, buffio_timeout_cb, b); free(b->output_buf); free(b->input_buf); free(b); } static void* buffio_user_cb(oop_source *source, struct timeval tv, void *user) { struct buffio *b = user; enum buffio_sched_cb s; int r = 0; assert(b && source && source == event_source); s = b->sched_cb; b->sched_cb = BUFFIO_SCHED_CB_IDLE; if (s & BUFFIO_SCHED_CB_INPUT_READY && b->input_ready_cb && b->input_length) r |= b->input_ready_cb(b, b->user); if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && b->output_empty_cb && !b->output_length) r |= b->output_empty_cb(b, b->user); if (s & BUFFIO_SCHED_CB_OUTPUT_REQUEST && b->output_request_cb && b->output_length < b->output_range) r |= b->output_request_cb(b, b->user); if (s & BUFFIO_SCHED_CB_EOF && b->eof_cb) r |= b->eof_cb(b, b->user); if (s & BUFFIO_SCHED_CB_EPIPE && b->epipe_cb) r |= b->epipe_cb(b, b->user); if (s & BUFFIO_SCHED_CB_ERROR && b->error_cb) r |= b->error_cb(b, b->user); return r == 0 ? OOP_CONTINUE : OOP_HALT; } static void buffio_sched_cb(struct buffio *b, enum buffio_sched_cb s) { if (s & BUFFIO_SCHED_CB_INPUT_READY && !b->input_ready_cb) s &= ~BUFFIO_SCHED_CB_INPUT_READY; if (s & BUFFIO_SCHED_CB_OUTPUT_REQUEST && !b->output_request_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_REQUEST; if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && !b->output_empty_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_EMPTY; if (s & BUFFIO_SCHED_CB_EOF && !b->eof_cb) s &= ~BUFFIO_SCHED_CB_EOF; if (s & BUFFIO_SCHED_CB_EPIPE && !b->epipe_cb) s = ~BUFFIO_SCHED_CB_EPIPE; if (s & BUFFIO_SCHED_CB_ERROR && !b->error_cb) s = ~BUFFIO_SCHED_CB_ERROR; if (s == BUFFIO_SCHED_CB_IDLE) return; if (!b->sched_cb) { assert(event_source); event_source->on_time(event_source, OOP_TIME_NOW, buffio_user_cb, b); } b->sched_cb |= s; } #ifdef IODEBUG static void esc_print(struct buffio *bio, const char*c, const uint8_t *b, size_t l) { size_t i; fprintf(stderr, "[%p] %s", bio, c); for (i = 0; i < l; i++, b++) fputc(*b < 32 ? '.' : *b, stderr); fprintf(stderr, "\n"); }; #endif inline static void buffio_normalize(struct buffio *b) { assert(b); assert(b->input_index < b->input_max_length); assert(b->input_length <= b->input_max_length); if (!b->input_length) /* This will optimize throughput a bit */ b->input_index = 0; assert(b->output_index < b->output_max_length); assert(b->output_length <= b->output_max_length); if (!b->output_length) /* This will optimize throughput a bit */ b->output_index = 0; } static void buffio_set_readable(struct buffio *b, int v) { assert(b && event_source); if (b->ifd == -1) return; if (!(b->readable = v)) { if (!b->b_read_cb) { /* Enable the callback */ event_source->on_fd(event_source, b->ifd, OOP_READ, buffio_read_cb, b); b->b_read_cb = 1; } } else { if (b->b_read_cb) { /* Disable the callback */ event_source->cancel_fd(event_source, b->ifd, OOP_READ); b->b_read_cb = 0; } } } static void buffio_set_writable(struct buffio *b, int v) { assert(b && event_source); if (b->ofd == -1) return; if (!(b->writable = v)) { if (!b->b_write_cb) { /* Enable the callback */ event_source->on_fd(event_source, b->ofd, OOP_WRITE, buffio_write_cb, b); b->b_write_cb = 1; } } else { if (b->b_write_cb) { /* Disable the callback */ event_source->cancel_fd(event_source, b->ofd, OOP_WRITE); b->b_write_cb = 0; } } } static void buffio_delay(struct buffio *b, unsigned bytes) { assert(event_source && event_source->on_time); if (b->delaying) { event_source->cancel_time(event_source, b->timeout_tv, buffio_timeout_cb, b); b->delaying = 0; } if (bytes && b->output_delay_usec) { struct timeval now; gettimeofday(&now, NULL); /* Time when the last write operation is complete */ b->finish_tv = timeval_add(timeval_max(b->finish_tv, now), (uint64_t) bytes * b->output_delay_usec); /* Time when we want to write the next data to the device */ b->timeout_tv = timeval_sub(b->finish_tv, b->output_latency_usec); assert(event_source && event_source->on_time); event_source->on_time(event_source, b->timeout_tv, buffio_timeout_cb, b); b->delaying = 1; // daemon_log(LOG_INFO, "%u bytes, now = %lu|%lu; finish = %lu|%lu; timeout = %lu|%lu", bytes, now.tv_sec, now.tv_usec, b->finish_tv.tv_sec, b->finish_tv.tv_usec, b->timeout_tv.tv_sec, b->timeout_tv.tv_usec); } } void buffio_set_delay_usec(struct buffio *b, unsigned long delay, unsigned long latency) { assert(b); b->output_delay_usec = delay; b->output_latency_usec = latency; } static void do_read(struct buffio *b) { ssize_t s; size_t m, i; assert(b); if (!b->readable || b->input_length >= b->input_range || b->ifd == -1) return; i = (b->input_index + b->input_length) % b->input_max_length; m = b->input_range-b->input_length; if (m > b->input_max_length-i) m = b->input_max_length-i; s = read(b->ifd, b->input_buf+i, m); buffio_set_readable(b, 0); //daemon_log(LOG_INFO, "%p: Read %u (%u) bytes.", b, s, m); if (s < 0) { if (errno == EAGAIN) return; daemon_log(LOG_ERR, "Failed to read from file descriptor: %s", strerror(errno)); buffio_close_input_fd(b); buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); return; } else if (!s) { buffio_close_input_fd(b); buffio_sched_cb(b, BUFFIO_SCHED_CB_EOF); return; } #ifdef IODEBUG esc_print(b, "INPUT: ", b->input_buf+i, s); #endif assert(s <= m); b->input_length += s; buffio_normalize(b); if (b->input_length) buffio_sched_cb(b, BUFFIO_SCHED_CB_INPUT_READY); } static void do_write(struct buffio *b) { ssize_t s; size_t m; assert(b); if (b->delaying || !b->writable || !b->output_length || b->ofd == -1) return; if (b->prebuf && b->output_length >= b->output_range) b->prebuf = 0; if (b->prebuf) return; m = b->output_length; if (m > b->output_max_length-b->output_index) m = b->output_max_length-b->output_index; s = write(b->ofd, b->output_buf+b->output_index, m); buffio_set_writable(b, 0); //daemon_log(LOG_INFO, "%p: Wrote %u (%u) bytes.", b, s, m); if (s < 0) { if (errno == EAGAIN) return; buffio_close_output_fd(b); if (errno == EPIPE) { buffio_sched_cb(b, BUFFIO_SCHED_CB_EPIPE); return; } else { daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno)); buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR); return; } } #ifdef IODEBUG esc_print(b, "OUTPUT: ", b->output_buf+b->output_index, s); #endif assert(s > 0 && s <= m); b->output_index = (b->output_index + s) % b->output_max_length; b->output_length -= s; buffio_normalize(b); buffio_delay(b, s); if (b->output_length < b->output_range) buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_REQUEST); if (!b->output_length) buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_EMPTY); } static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user) { struct buffio *b = user; assert(source && b && b->ifd == fd && event == OOP_READ); buffio_set_readable(b, 1); do_read(b); return OOP_CONTINUE; } static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user) { struct buffio *b = user; assert(source && b && b->ofd == fd && event == OOP_WRITE); buffio_set_writable(b, 1); do_write(b); return OOP_CONTINUE; } static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user) { struct buffio *b = user; assert(source && b && b->delaying); b->delaying = 0; do_write(b); return OOP_CONTINUE; } void buffio_write(struct buffio *b, const uint8_t *d, size_t l) { assert(b && d && l); if (b->output_length > b->output_range || l > b->output_range - b->output_length) { daemon_log(LOG_ERR, "buffio_write() with too much data called"); return; } while (l > 0) { size_t m, i; i = (b->output_index + b->output_length) % b->output_max_length; m = l; if (m > b->output_max_length-i) m = b->output_max_length-i; memcpy(b->output_buf+i, d, m); l -= m; b->output_length += m; } buffio_normalize(b); do_write(b); } void buffio_print(struct buffio *b, const char *s) { assert(b && s); buffio_write(b, (uint8_t*) s, strlen(s)); } void buffio_command(struct buffio *b, const char *c) { assert(b && c); buffio_flush_input(b); buffio_print(b, c); } void buffio_flush_input(struct buffio *b) { assert(b); b->input_length = b->input_index = 0; do_read(b); } void buffio_flush_output(struct buffio *b) { assert(b); b->output_length = b->output_index = 0; } int buffio_find_input(struct buffio *b, const char *c) { size_t l, cl, i; assert(b && c && *c); cl = strlen(c); for (i = b->input_index, l = b->input_length; l >= cl; i = (i+1) % b->input_max_length, l--) { const char *p; size_t j; for (j = i, p = c; *p && *p == b->input_buf[j]; p++, j = (j+1) % b->input_max_length); if (!*p) { /* Found! */ b->input_index = j; b->input_length = l-cl; buffio_normalize(b); do_read(b); return 1; } } return 0; } char* buffio_read_line(struct buffio *b, char *c, size_t l) { size_t i; assert(b && c && l); /* Look for a \n */ for (i = 0; i < b->input_length && i < l-1; i++) { if (b->input_buf[(b->input_index+i) % b->input_max_length] == '\n') { size_t j; /* Once again, now copy */ for (j = 0;; j++) { c[j] = b->input_buf[(b->input_index+j) % b->input_max_length]; /* Finished? */ if (c[j] == '\n') { c[j+1] = 0; b->input_index = (b->input_index+j+1) % b->input_max_length; b->input_length -= j+1; buffio_normalize(b); do_read(b); return c; } } } } return NULL; } void buffio_dump(struct buffio *b) { size_t i; assert(b); fprintf(stderr, "%p INPUT_BUFFER: [", b); for (i = 0; i < b->input_length; i++) { char c = b->input_buf[(i + b->input_index) % b->input_max_length]; fputc(c < 32 ? '.' : c, stderr); } fprintf(stderr, "]\n"); fprintf(stderr, "%p OUTPUT_BUFFER: [", b); for (i = 0; i < b->output_length; i++) { char c = b->output_buf[(i + b->output_index) % b->output_max_length]; fputc(c < 32 ? '.' : c, stderr); } fprintf(stderr, "]\n"); } void buffio_set_input_range(struct buffio *b, ssize_t w) { assert(b); if (w < 0) b->input_range = b->input_max_length; else b->input_range = w; assert(b->input_range > 0 && b->input_range <= b->input_max_length); do_read(b); } void buffio_set_output_range(struct buffio *b, ssize_t w) { assert(b); if (w < 0) b->output_range = b->output_max_length; else b->output_range = w; assert(b->output_range > 0 && b->output_range <= b->output_max_length); do_write(b); } const uint8_t* buffio_read_ptr(struct buffio *b, size_t *l) { assert(b && l); if (!b->input_length) return NULL; *l = b->input_length; if (*l > b->input_max_length - b->input_index) *l = b->input_max_length - b->input_index; return b->input_buf + b->input_index; } void buffio_read_ptr_inc(struct buffio *b, size_t l) { assert(b); if (!l) return; assert(l <= b->input_length && l <= b->input_max_length - b->input_index); b->input_index += l; while (b->input_index >= b->input_max_length) b->input_index -= b->input_max_length; b->input_length -= l; buffio_normalize(b); do_read(b); } uint8_t* buffio_write_ptr(struct buffio *b, size_t *l) { size_t j; assert(b && l); if (b->output_length >= b->output_range) return NULL; j = (b->output_index + b->output_length) % b->output_max_length; *l = b->output_range - b->output_length; if (*l > b->output_max_length - j) *l = b->output_max_length - j; return b->output_buf + j; } void buffio_write_ptr_inc(struct buffio *b, size_t l) { int j; assert(b); if (!l) return; j = (b->output_index + b->output_length) % b->output_max_length; assert(l <= b->output_max_length - b->output_length && l <= b->output_max_length - j); b->output_length += l; buffio_normalize(b); do_write(b); } int buffio_output_is_empty(struct buffio *b) { assert(b); return b->output_length == 0; } int buffio_input_is_full(struct buffio *b) { assert(b); return b->input_length >= b->input_range; } void buffio_dump_lines(struct buffio *b) { int r, i; assert(b); r = 0; for (i = 0; i < b->input_length; i++) if (b->input_buf[(b->input_index+i) % b->input_max_length] == '\n') r = i+1; if (r > 0) { b->input_index = (b->input_index+r) % b->input_max_length; b->input_length -= r; buffio_normalize(b); do_read(b); } } void buffio_set_fds(struct buffio *b, int ifd, int ofd) { assert(b && b->ifd == -1 && b->ofd == -1); daemon_nonblock(b->ifd = ifd, 1); daemon_nonblock(b->ofd = ofd, 1); buffio_set_readable(b, 0); buffio_set_writable(b, 0); } int buffio_can_write(struct buffio *b, size_t l) { assert(b); if (b->output_length >= b->output_range) return 0; return l <= b->output_range - b->output_length; } int buffio_write_req(struct buffio *b) { assert(b); return b->output_length < b->output_range; } void buffio_set_prebuf(struct buffio *b, int p) { assert(b); b->prebuf = p; do_write(b); }