summaryrefslogtreecommitdiffstats
path: root/src/buffio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/buffio.c')
-rw-r--r--src/buffio.c436
1 files changed, 365 insertions, 71 deletions
diff --git a/src/buffio.c b/src/buffio.c
index 25ba5a4..727821d 100644
--- a/src/buffio.c
+++ b/src/buffio.c
@@ -3,16 +3,21 @@
#include <string.h>
#include <errno.h>
#include <stdio.h>
+#include <sys/socket.h>
#include <oop.h>
#include <libdaemon/dlog.h>
+#include <libdaemon/dnonblock.h>
#include "buffio.h"
#include "main.h"
+#include "timevalarith.h"
-#define BUFSIZE 10240
+//#define IODEBUG 1
+#define BUFSIZE PIPE_BUF
+static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user);
static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user);
static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user);
@@ -29,6 +34,9 @@ struct buffio* buffio_new(int ifd, int ofd) {
b->ifd = ifd;
b->ofd = ofd;
+
+ daemon_nonblock(b->ifd, 1);
+ daemon_nonblock(b->ofd, 1);
b->input_buf = malloc(b->input_max_length = BUFSIZE);
assert(b->input_buf);
@@ -46,36 +54,99 @@ struct buffio* buffio_new(int ifd, int ofd) {
return b;
}
-static void buffio_close_fd(struct buffio *b) {
+void buffio_close_input_fd(struct buffio *b) {
assert(b);
- if (b->b_read_cb)
+ if (b->b_read_cb) {
event_source->cancel_fd(event_source, b->ifd, OOP_READ);
+ b->b_read_cb = 0;
+ }
- if (b->b_write_cb)
+ if (b->ifd >= 0) {
+
+ if (b->ofd != b->ifd)
+ close(b->ifd);
+
+ b->ifd = -1;
+ }
+}
+
+void buffio_close_output_fd(struct buffio *b) {
+ assert(b);
+
+ if (b->b_write_cb) {
event_source->cancel_fd(event_source, b->ofd, OOP_WRITE);
+ b->b_write_cb = 0;
+ }
- if (b->ifd >= 0)
- close(b->ifd);
- if (b->ofd >= 0 && b->ofd != b->ifd)
- close(b->ofd);
+ if (b->ofd >= 0) {
- b->b_read_cb = b->b_write_cb = 0;
- b->ifd = b->ofd = -1;
+ if (b->ofd != b->ifd)
+ close(b->ofd);
+
+ b->ofd = -1;
+ }
}
-
void buffio_free(struct buffio *b) {
- buffio_close_fd(b);
+ buffio_close_input_fd(b);
+ buffio_close_output_fd(b);
+
+ if (b->delaying)
+ event_source->cancel_time(event_source, b->timeout_tv, buffio_timeout_cb, b);
+
free(b->output_buf);
free(b->input_buf);
free(b);
}
+static void* buffio_user_cb(oop_source *source, struct timeval tv, void *user) {
+ struct buffio *b = user;
+ enum buffio_sched_cb s;
+
+ int r = 0;
+ assert(b && source && source == event_source);
+
+ s = b->sched_cb;
+ b->sched_cb = BUFFIO_SCHED_CB_IDLE;
+
+ if (s & BUFFIO_SCHED_CB_INPUT_READY && b->input_ready_cb && b->input_length)
+ r |= b->input_ready_cb(b, b->user);
+ if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && b->output_empty_cb && !b->output_length)
+ r |= b->output_empty_cb(b, b->user);
+ if (s & BUFFIO_SCHED_CB_EOF && b->eof_cb)
+ r |= b->eof_cb(b, b->user);
+ if (s & BUFFIO_SCHED_CB_EPIPE && b->epipe_cb)
+ r |= b->epipe_cb(b, b->user);
+ if (s & BUFFIO_SCHED_CB_ERROR && b->error_cb)
+ r |= b->error_cb(b, b->user);
+
+ return r == 0 ? OOP_CONTINUE : OOP_HALT;
+}
+
+
+static void buffio_sched_cb(struct buffio *b, enum buffio_sched_cb s) {
+ if (s & BUFFIO_SCHED_CB_INPUT_READY && !b->input_ready_cb) s &= ~BUFFIO_SCHED_CB_INPUT_READY;
+ if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && !b->output_empty_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_EMPTY;
+ if (s & BUFFIO_SCHED_CB_EOF && !b->eof_cb) s &= ~BUFFIO_SCHED_CB_EOF;
+ if (s & BUFFIO_SCHED_CB_EPIPE && !b->epipe_cb) s = ~BUFFIO_SCHED_CB_EPIPE;
+ if (s & BUFFIO_SCHED_CB_ERROR && !b->error_cb) s = ~BUFFIO_SCHED_CB_ERROR;
+
+ if (s == BUFFIO_SCHED_CB_IDLE)
+ return;
+
+ if (!b->sched_cb) {
+ assert(event_source);
+ event_source->on_time(event_source, OOP_TIME_NOW, buffio_user_cb, b);
+ }
+
+ b->sched_cb |= s;
+}
+
#ifdef IODEBUG
-static void esc_print(const char*c, const uint8_t *b, size_t l) {
+static void esc_print(struct buffio *bio, const char*c, const uint8_t *b, size_t l) {
size_t i;
- fprintf(stderr, "%s", c);
+ fprintf(stderr, "[%p] %s", bio, c);
for (i = 0; i < l; i++, b++)
fputc(*b < 32 ? '.' : *b, stderr);
@@ -83,7 +154,7 @@ static void esc_print(const char*c, const uint8_t *b, size_t l) {
};
#endif
-static void buffio_normalize(struct buffio *b) {
+inline static void buffio_normalize(struct buffio *b) {
assert(b);
assert(b->input_index < b->input_max_length);
@@ -92,14 +163,20 @@ static void buffio_normalize(struct buffio *b) {
if (!b->input_length) /* This will optimize throughput a bit */
b->input_index = 0;
+ assert(b->output_index < b->output_max_length);
+ assert(b->output_length <= b->output_max_length);
+
if (!b->output_length) /* This will optimize throughput a bit */
b->output_index = 0;
}
static void buffio_set_input_callbacks(struct buffio *b) {
- assert(b && b->ifd >= 0 && event_source);
+ assert(b && event_source);
- if (b->input_length <= b->input_watermark) {
+ if (b->ifd == -1)
+ return;
+
+ if (!b->readable && b->input_length < b->input_watermark) {
if (!b->b_read_cb) { /* Enable the callback */
event_source->on_fd(event_source, b->ifd, OOP_READ, buffio_read_cb, b);
b->b_read_cb = 1;
@@ -113,9 +190,12 @@ static void buffio_set_input_callbacks(struct buffio *b) {
}
static void buffio_set_output_callbacks(struct buffio *b) {
- assert(b && b->ofd >= 0 && event_source);
+ assert(b && event_source);
- if (b->output_length >= b->output_watermark) {
+ if (b->ofd == -1)
+ return;
+
+ if (!b->writable && b->output_length >= b->output_watermark) {
if (!b->b_write_cb) { /* Enable the callback */
event_source->on_fd(event_source, b->ofd, OOP_WRITE, buffio_write_cb, b);
b->b_write_cb = 1;
@@ -128,11 +208,44 @@ static void buffio_set_output_callbacks(struct buffio *b) {
}
}
-static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user) {
- struct buffio *b = user;
- assert(source && b && b->ifd == fd && event == OOP_READ);
+static void buffio_delay(struct buffio *b, unsigned bytes) {
+ assert(event_source && event_source->on_time);
+
+ if (b->delaying) {
+ event_source->cancel_time(event_source, b->timeout_tv, buffio_timeout_cb, b);
+ b->delaying = 0;
+ }
+
+ if (bytes && b->output_delay_usec) {
+ struct timeval now;
+
+ gettimeofday(&now, NULL);
+
+ /* Time when the last write operation is complete */
+ b->finish_tv = timeval_add(timeval_max(b->finish_tv, now), (uint64_t) bytes * b->output_delay_usec);
+
+ /* Time when we want to write the next data to the device */
+ b->timeout_tv = timeval_sub(b->finish_tv, b->output_latency_usec);
+
+ assert(event_source && event_source->on_time);
+ event_source->on_time(event_source, b->timeout_tv, buffio_timeout_cb, b);
+ b->delaying = 1;
+
+// daemon_log(LOG_INFO, "%u bytes, now = %lu|%lu; finish = %lu|%lu; timeout = %lu|%lu", bytes, now.tv_sec, now.tv_usec, b->finish_tv.tv_sec, b->finish_tv.tv_usec, b->timeout_tv.tv_sec, b->timeout_tv.tv_usec);
+ }
+}
- if (b->input_length < b->input_max_length) {
+void buffio_set_delay_usec(struct buffio *b, unsigned long delay, unsigned long latency) {
+ assert(b);
+
+ b->output_delay_usec = delay;
+ b->output_latency_usec = latency;
+}
+
+static void do_read(struct buffio *b) {
+ assert(b);
+
+ if (b->readable && b->input_length < b->input_watermark) {
ssize_t s;
size_t m, i;
@@ -141,41 +254,47 @@ static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *u
m = b->input_max_length-b->input_length;
if (m > b->input_max_length-i)
m = b->input_max_length-i;
-
- if ((s = read(fd, b->input_buf+i, m)) < 0) {
+
+ s = read(b->ifd, b->input_buf+i, m);
+ b->readable = 0;
+
+ //daemon_log(LOG_INFO, "%p: Read %u (%u) bytes.", b, s, m);
+
+ if (s < 0) {
daemon_log(LOG_ERR, "Failed to read from file descriptor: %s", strerror(errno));
- return OOP_HALT;
- }
+ buffio_close_input_fd(b);
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR);
+ return;
+
+ } else if (!s) {
+ buffio_close_input_fd(b);
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_EOF);
+ return;
+
+ } else {
#ifdef IODEBUG
- esc_print("INPUT: ", b->input_buf+i, s);
+ esc_print(b, "INPUT: ", b->input_buf+i, s);
#endif
-
- if (!s) {
- buffio_close_fd(b);
- if (b->eof_cb)
- b->eof_cb(b, b->user);
- return OOP_CONTINUE;
+
+ assert(s <= m);
+ b->input_length += s;
+ buffio_normalize(b);
}
-
- assert(s <= m);
- b->input_length += s;
}
- buffio_normalize(b);
buffio_set_input_callbacks(b);
- if (b->input_length && b->input_ready_cb)
- b->input_ready_cb(b, b->user);
+ if (b->input_length)
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_INPUT_READY);
- return OOP_CONTINUE;
+ return;
}
-static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user) {
- struct buffio *b = user;
- assert(source && b && b->ofd == fd && event == OOP_WRITE);
+static void do_write(struct buffio *b) {
+ assert(b);
- if (b->output_length > 0) {
+ if (!b->delaying && b->writable && b->output_length >= b->output_watermark) {
ssize_t s;
size_t m;
@@ -183,37 +302,73 @@ static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *
if (m > b->output_max_length-b->output_index)
m = b->output_max_length-b->output_index;
- if ((s = write(fd, b->output_buf+b->output_index, m)) < 0) {
- daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno));
- return OOP_HALT;
- }
+ s = write(b->ofd, b->output_buf+b->output_index, m);
+ b->writable = 0;
-#ifdef IODEBUG
- esc_print("OUTPUT: ", b->output_buf+b->output_index, s);
-#endif
+ //daemon_log(LOG_INFO, "%p: Wrote %u (%u) bytes.", b, s, m);
- if (!s) {
- buffio_close_fd(b);
- if (b->eof_cb)
- b->eof_cb(b, b->user);
- return OOP_CONTINUE;
+ if (s < 0) {
+ buffio_close_output_fd(b);
+
+ if (errno == EPIPE) {
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_EPIPE);
+ return;
+
+ } else {
+ daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno));
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR);
+ return;
+ }
}
+
+#ifdef IODEBUG
+ esc_print(b, "OUTPUT: ", b->output_buf+b->output_index, s);
+#endif
- assert(s <= m);
+ assert(s > 0 && s <= m);
b->output_index = (b->output_index + s) % b->output_max_length;
b->output_length -= s;
- }
- buffio_normalize(b);
- buffio_set_output_callbacks(b);
+ buffio_normalize(b);
+ buffio_delay(b, s);
- if (!b->output_length && b->output_empty_cb)
- b->output_empty_cb(b, b->user);
+ if (!b->output_length)
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_EMPTY);
+ } else
+ buffio_set_output_callbacks(b);
+
+ return;
+}
+
+static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user) {
+ struct buffio *b = user;
+ assert(source && b && b->ifd == fd && event == OOP_READ);
+
+ b->readable = 1;
+ do_read(b);
return OOP_CONTINUE;
+}
+static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user) {
+ struct buffio *b = user;
+ assert(source && b && b->ofd == fd && event == OOP_WRITE);
+
+ b->writable = 1;
+ do_write(b);
+
+ return OOP_CONTINUE;
}
+static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user) {
+ struct buffio *b = user;
+ assert(source && b && b->delaying);
+
+ b->delaying = 0;
+ do_write(b);
+
+ return OOP_CONTINUE;
+}
int buffio_write(struct buffio *b, const uint8_t *d, size_t l) {
assert(b && d && l);
@@ -229,9 +384,6 @@ int buffio_write(struct buffio *b, const uint8_t *d, size_t l) {
i = (b->output_index + b->output_length) % b->output_max_length;
m = l;
- if (m > b->output_max_length-b->output_length)
- m = b->output_max_length-b->output_length;
-
if (m > b->output_max_length-i)
m = b->output_max_length-i;
@@ -241,9 +393,9 @@ int buffio_write(struct buffio *b, const uint8_t *d, size_t l) {
b->output_length += m;
}
- buffio_set_output_callbacks(b);
buffio_normalize(b);
-
+
+ do_write(b);
return 0;
}
@@ -266,6 +418,13 @@ void buffio_flush_input(struct buffio *b) {
buffio_set_input_callbacks(b);
}
+void buffio_flush_output(struct buffio *b) {
+ assert(b);
+
+ b->output_length = b->output_index = 0;
+ buffio_set_output_callbacks(b);
+}
+
int buffio_find_input(struct buffio *b, const char *c) {
size_t l, cl, i;
@@ -284,7 +443,8 @@ int buffio_find_input(struct buffio *b, const char *c) {
b->input_length = l-cl;
buffio_normalize(b);
- buffio_set_input_callbacks(b);
+
+ do_read(b);
return 1;
}
}
@@ -293,7 +453,7 @@ int buffio_find_input(struct buffio *b, const char *c) {
}
-char *buffio_read_line(struct buffio *b, char *c, size_t l) {
+char* buffio_read_line(struct buffio *b, char *c, size_t l) {
size_t i;
assert(b && c && l);
@@ -312,8 +472,10 @@ char *buffio_read_line(struct buffio *b, char *c, size_t l) {
b->input_index = (b->input_index+j+1) % b->input_max_length;
b->input_length -= j+1;
- buffio_set_input_callbacks(b);
+
buffio_normalize(b);
+
+ do_read(b);
return c;
}
}
@@ -341,3 +503,135 @@ void buffio_dump(struct buffio *b) {
}
fprintf(stderr, "]\n");
}
+
+void buffio_set_input_watermark(struct buffio *b, ssize_t w) {
+ assert(b);
+
+ if (w < 0)
+ b->input_watermark = b->input_max_length;
+ else
+ b->input_watermark = w;
+
+ assert(b->input_watermark > 0 && b->input_watermark <= b->input_max_length);
+ buffio_set_input_callbacks(b);
+}
+
+void buffio_set_output_watermark(struct buffio *b, ssize_t w) {
+ assert(b);
+
+ if (w < 0)
+ b->output_watermark = b->output_max_length;
+ else
+ b->output_watermark = w;
+
+ assert(b->output_watermark > 0 && b->output_watermark <= b->output_max_length);
+ buffio_set_output_callbacks(b);
+}
+
+
+const uint8_t* buffio_read_ptr(struct buffio *b, size_t *l) {
+ assert(b && l);
+
+ *l = b->input_length;
+
+ if (*l > b->input_max_length - b->input_index)
+ *l = b->input_max_length - b->input_index;
+
+ return b->input_buf + b->input_index;
+}
+
+void buffio_read_ptr_inc(struct buffio *b, size_t l) {
+ assert(b);
+
+ if (!l)
+ return;
+
+ assert(l <= b->input_length && l <= b->input_max_length - b->input_index);
+ b->input_index += l;
+
+ while (b->input_index >= b->input_max_length)
+ b->input_index -= b->input_max_length;
+
+ b->input_length -= l;
+
+ buffio_normalize(b);
+
+ do_read(b);
+}
+
+uint8_t* buffio_write_ptr(struct buffio *b, size_t *l) {
+ size_t j;
+ assert(b && l);
+
+ j = (b->output_index + b->output_length) % b->output_max_length;
+
+ *l = b->output_max_length - b->output_length;
+ if (*l > b->output_max_length - j)
+ *l = b->output_max_length - j;
+
+ return b->output_buf + j;
+}
+
+void buffio_write_ptr_inc(struct buffio *b, size_t l) {
+ int j;
+ assert(b);
+
+ if (!l)
+ return;
+
+ j = (b->output_index + b->output_length) % b->output_max_length;
+ assert(l <= b->output_max_length - b->output_length && l <= b->output_max_length - j);
+
+ b->output_length += l;
+
+ buffio_normalize(b);
+
+ do_write(b);
+}
+
+int buffio_output_is_empty(struct buffio *b) {
+ assert(b);
+
+ return b->output_length == 0;
+}
+
+void buffio_dump_lines(struct buffio *b) {
+ int r, i;
+ assert(b);
+
+ r = 0;
+
+ for (i = 0; i < b->input_length; i++)
+ if (b->input_buf[(b->input_index+i) % b->input_max_length] == '\n')
+ r = i+1;
+
+ if (r > 0) {
+ b->input_index = (b->input_index+r) % b->input_max_length;
+ b->input_length -= r;
+
+ buffio_normalize(b);
+
+ do_read(b);
+ }
+}
+
+void buffio_set_fds(struct buffio *b, int ifd, int ofd) {
+ assert(b && b->ifd == -1 && b->ofd == -1);
+
+ b->ifd = ifd;
+ b->ofd = ofd;
+
+ daemon_nonblock(b->ifd, 1);
+ daemon_nonblock(b->ofd, 1);
+
+ b->readable = b->writable = 0;
+
+ buffio_set_input_callbacks(b);
+ buffio_set_output_callbacks(b);
+}
+
+int buffio_can_write(struct buffio *b, size_t l) {
+ assert(b);
+
+ return l <= b->output_max_length - b->output_length;
+}