summaryrefslogtreecommitdiffstats
path: root/src/buffio.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/buffio.c')
-rw-r--r--src/buffio.c276
1 files changed, 146 insertions, 130 deletions
diff --git a/src/buffio.c b/src/buffio.c
index 727821d..95db681 100644
--- a/src/buffio.c
+++ b/src/buffio.c
@@ -21,8 +21,8 @@ static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user
static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user);
static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *user);
-static void buffio_set_input_callbacks(struct buffio *b);
-static void buffio_set_output_callbacks(struct buffio *b);
+static void buffio_set_writable(struct buffio *b, int v);
+static void buffio_set_readable(struct buffio *b, int v);
struct buffio* buffio_new(int ifd, int ofd) {
struct buffio *b;
@@ -32,24 +32,21 @@ struct buffio* buffio_new(int ifd, int ofd) {
assert(b);
memset(b, 0, sizeof(struct buffio));
- b->ifd = ifd;
- b->ofd = ofd;
-
- daemon_nonblock(b->ifd, 1);
- daemon_nonblock(b->ofd, 1);
+ daemon_nonblock(b->ifd = ifd, 1);
+ daemon_nonblock(b->ofd = ofd, 1);
b->input_buf = malloc(b->input_max_length = BUFSIZE);
assert(b->input_buf);
b->input_length = b->input_index = 0;
- b->input_watermark = b->input_max_length; /* Read some more data if possible */
+ b->input_range = b->input_max_length;
b->output_buf = malloc(b->output_max_length = BUFSIZE);
assert(b->output_buf);
b->output_length = b->output_index = 0;
- b->output_watermark = 1; /* Write some data if 1 or more bytes are in the output buffer */
+ b->output_range = b->output_max_length;
- buffio_set_input_callbacks(b);
- buffio_set_output_callbacks(b);
+ buffio_set_readable(b, 0);
+ buffio_set_writable(b, 0);
return b;
}
@@ -114,6 +111,8 @@ static void* buffio_user_cb(oop_source *source, struct timeval tv, void *user) {
r |= b->input_ready_cb(b, b->user);
if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && b->output_empty_cb && !b->output_length)
r |= b->output_empty_cb(b, b->user);
+ if (s & BUFFIO_SCHED_CB_OUTPUT_REQUEST && b->output_request_cb && b->output_length < b->output_range)
+ r |= b->output_request_cb(b, b->user);
if (s & BUFFIO_SCHED_CB_EOF && b->eof_cb)
r |= b->eof_cb(b, b->user);
if (s & BUFFIO_SCHED_CB_EPIPE && b->epipe_cb)
@@ -124,9 +123,9 @@ static void* buffio_user_cb(oop_source *source, struct timeval tv, void *user) {
return r == 0 ? OOP_CONTINUE : OOP_HALT;
}
-
static void buffio_sched_cb(struct buffio *b, enum buffio_sched_cb s) {
if (s & BUFFIO_SCHED_CB_INPUT_READY && !b->input_ready_cb) s &= ~BUFFIO_SCHED_CB_INPUT_READY;
+ if (s & BUFFIO_SCHED_CB_OUTPUT_REQUEST && !b->output_request_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_REQUEST;
if (s & BUFFIO_SCHED_CB_OUTPUT_EMPTY && !b->output_empty_cb) s &= ~BUFFIO_SCHED_CB_OUTPUT_EMPTY;
if (s & BUFFIO_SCHED_CB_EOF && !b->eof_cb) s &= ~BUFFIO_SCHED_CB_EOF;
if (s & BUFFIO_SCHED_CB_EPIPE && !b->epipe_cb) s = ~BUFFIO_SCHED_CB_EPIPE;
@@ -170,13 +169,13 @@ inline static void buffio_normalize(struct buffio *b) {
b->output_index = 0;
}
-static void buffio_set_input_callbacks(struct buffio *b) {
+static void buffio_set_readable(struct buffio *b, int v) {
assert(b && event_source);
if (b->ifd == -1)
return;
- if (!b->readable && b->input_length < b->input_watermark) {
+ if (!(b->readable = v)) {
if (!b->b_read_cb) { /* Enable the callback */
event_source->on_fd(event_source, b->ifd, OOP_READ, buffio_read_cb, b);
b->b_read_cb = 1;
@@ -189,13 +188,13 @@ static void buffio_set_input_callbacks(struct buffio *b) {
}
}
-static void buffio_set_output_callbacks(struct buffio *b) {
+static void buffio_set_writable(struct buffio *b, int v) {
assert(b && event_source);
if (b->ofd == -1)
return;
- if (!b->writable && b->output_length >= b->output_watermark) {
+ if (!(b->writable = v)) {
if (!b->b_write_cb) { /* Enable the callback */
event_source->on_fd(event_source, b->ofd, OOP_WRITE, buffio_write_cb, b);
b->b_write_cb = 1;
@@ -243,108 +242,109 @@ void buffio_set_delay_usec(struct buffio *b, unsigned long delay, unsigned long
}
static void do_read(struct buffio *b) {
+ ssize_t s;
+ size_t m, i;
+
assert(b);
- if (b->readable && b->input_length < b->input_watermark) {
- ssize_t s;
- size_t m, i;
-
- i = (b->input_index + b->input_length) % b->input_max_length;
-
- m = b->input_max_length-b->input_length;
- if (m > b->input_max_length-i)
- m = b->input_max_length-i;
-
- s = read(b->ifd, b->input_buf+i, m);
- b->readable = 0;
-
- //daemon_log(LOG_INFO, "%p: Read %u (%u) bytes.", b, s, m);
-
- if (s < 0) {
- daemon_log(LOG_ERR, "Failed to read from file descriptor: %s", strerror(errno));
- buffio_close_input_fd(b);
- buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR);
- return;
-
- } else if (!s) {
- buffio_close_input_fd(b);
- buffio_sched_cb(b, BUFFIO_SCHED_CB_EOF);
- return;
-
- } else {
+ if (!b->readable || b->input_length >= b->input_range || b->ifd == -1)
+ return;
+
+ i = (b->input_index + b->input_length) % b->input_max_length;
+ m = b->input_range-b->input_length;
+ if (m > b->input_max_length-i)
+ m = b->input_max_length-i;
+
+ s = read(b->ifd, b->input_buf+i, m);
+ //daemon_log(LOG_INFO, "%p: Read %u (%u) bytes.", b, s, m);
+
+ if (s < 0) {
+ daemon_log(LOG_ERR, "Failed to read from file descriptor: %s", strerror(errno));
+ buffio_close_input_fd(b);
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR);
+ return;
+
+ } else if (!s) {
+ buffio_close_input_fd(b);
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_EOF);
+ return;
+
+ }
+
#ifdef IODEBUG
- esc_print(b, "INPUT: ", b->input_buf+i, s);
+ esc_print(b, "INPUT: ", b->input_buf+i, s);
#endif
-
- assert(s <= m);
- b->input_length += s;
- buffio_normalize(b);
- }
- }
- buffio_set_input_callbacks(b);
+ assert(s <= m);
+ b->input_length += s;
+ buffio_normalize(b);
+
+ buffio_set_readable(b, 0);
- if (b->input_length)
+ if (b->input_length)
buffio_sched_cb(b, BUFFIO_SCHED_CB_INPUT_READY);
-
- return;
}
static void do_write(struct buffio *b) {
- assert(b);
+ ssize_t s;
+ size_t m;
- if (!b->delaying && b->writable && b->output_length >= b->output_watermark) {
- ssize_t s;
- size_t m;
+ assert(b);
- m = b->output_length;
- if (m > b->output_max_length-b->output_index)
- m = b->output_max_length-b->output_index;
+ if (b->delaying || !b->writable || !b->output_length || b->ofd == -1)
+ return;
- s = write(b->ofd, b->output_buf+b->output_index, m);
- b->writable = 0;
+ if (b->prebuf && b->output_length >= b->output_range)
+ b->prebuf = 0;
- //daemon_log(LOG_INFO, "%p: Wrote %u (%u) bytes.", b, s, m);
+ if (b->prebuf)
+ return;
+
+ m = b->output_length;
+ if (m > b->output_max_length-b->output_index)
+ m = b->output_max_length-b->output_index;
+
+ s = write(b->ofd, b->output_buf+b->output_index, m);
+ //daemon_log(LOG_INFO, "%p: Wrote %u (%u) bytes.", b, s, m);
+
+ if (s < 0) {
+ buffio_close_output_fd(b);
- if (s < 0) {
- buffio_close_output_fd(b);
+ if (errno == EPIPE) {
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_EPIPE);
+ return;
- if (errno == EPIPE) {
- buffio_sched_cb(b, BUFFIO_SCHED_CB_EPIPE);
- return;
-
- } else {
- daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno));
- buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR);
- return;
- }
+ } else {
+ daemon_log(LOG_ERR, "Failed to write to file descriptor: %s", strerror(errno));
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_ERROR);
+ return;
}
-
+ }
+
#ifdef IODEBUG
- esc_print(b, "OUTPUT: ", b->output_buf+b->output_index, s);
+ esc_print(b, "OUTPUT: ", b->output_buf+b->output_index, s);
#endif
+
+ assert(s > 0 && s <= m);
+ b->output_index = (b->output_index + s) % b->output_max_length;
+ b->output_length -= s;
+ buffio_normalize(b);
+
+ buffio_set_writable(b, 0);
+ buffio_delay(b, s);
- assert(s > 0 && s <= m);
- b->output_index = (b->output_index + s) % b->output_max_length;
- b->output_length -= s;
-
- buffio_normalize(b);
- buffio_delay(b, s);
-
- if (!b->output_length)
- buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_EMPTY);
- } else
- buffio_set_output_callbacks(b);
-
- return;
+ if (b->output_length < b->output_range)
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_REQUEST);
+ if (!b->output_length)
+ buffio_sched_cb(b, BUFFIO_SCHED_CB_OUTPUT_EMPTY);
}
static void* buffio_read_cb(oop_source *source, int fd, oop_event event, void *user) {
struct buffio *b = user;
assert(source && b && b->ifd == fd && event == OOP_READ);
- b->readable = 1;
+ buffio_set_readable(b, 1);
do_read(b);
return OOP_CONTINUE;
@@ -354,7 +354,7 @@ static void* buffio_write_cb(oop_source *source, int fd, oop_event event, void *
struct buffio *b = user;
assert(source && b && b->ofd == fd && event == OOP_WRITE);
- b->writable = 1;
+ buffio_set_writable(b, 1);
do_write(b);
return OOP_CONTINUE;
@@ -370,12 +370,12 @@ static void* buffio_timeout_cb(oop_source *source, struct timeval tv, void *user
return OOP_CONTINUE;
}
-int buffio_write(struct buffio *b, const uint8_t *d, size_t l) {
+void buffio_write(struct buffio *b, const uint8_t *d, size_t l) {
assert(b && d && l);
- if (l > b->output_max_length - b->output_length) {
+ if (b->output_length > b->output_range || l > b->output_range - b->output_length) {
daemon_log(LOG_ERR, "buffio_write() with too much data called");
- return -1;
+ return;
}
while (l > 0) {
@@ -394,38 +394,34 @@ int buffio_write(struct buffio *b, const uint8_t *d, size_t l) {
}
buffio_normalize(b);
-
do_write(b);
- return 0;
}
-int buffio_print(struct buffio *b, const char *s) {
+void buffio_print(struct buffio *b, const char *s) {
assert(b && s);
- return buffio_write(b, (uint8_t*) s, strlen(s));
+ buffio_write(b, (uint8_t*) s, strlen(s));
}
-int buffio_command(struct buffio *b, const char *c) {
+void buffio_command(struct buffio *b, const char *c) {
assert(b && c);
buffio_flush_input(b);
- return buffio_print(b, c);
+ buffio_print(b, c);
}
void buffio_flush_input(struct buffio *b) {
assert(b);
b->input_length = b->input_index = 0;
- buffio_set_input_callbacks(b);
+ do_read(b);
}
void buffio_flush_output(struct buffio *b) {
assert(b);
b->output_length = b->output_index = 0;
- buffio_set_output_callbacks(b);
}
-
int buffio_find_input(struct buffio *b, const char *c) {
size_t l, cl, i;
assert(b && c && *c);
@@ -441,7 +437,6 @@ int buffio_find_input(struct buffio *b, const char *c) {
if (!*p) { /* Found! */
b->input_index = j;
b->input_length = l-cl;
-
buffio_normalize(b);
do_read(b);
@@ -472,7 +467,6 @@ char* buffio_read_line(struct buffio *b, char *c, size_t l) {
b->input_index = (b->input_index+j+1) % b->input_max_length;
b->input_length -= j+1;
-
buffio_normalize(b);
do_read(b);
@@ -504,34 +498,39 @@ void buffio_dump(struct buffio *b) {
fprintf(stderr, "]\n");
}
-void buffio_set_input_watermark(struct buffio *b, ssize_t w) {
+void buffio_set_input_range(struct buffio *b, ssize_t w) {
assert(b);
if (w < 0)
- b->input_watermark = b->input_max_length;
+ b->input_range = b->input_max_length;
else
- b->input_watermark = w;
+ b->input_range = w;
+
+ assert(b->input_range > 0 && b->input_range <= b->input_max_length);
- assert(b->input_watermark > 0 && b->input_watermark <= b->input_max_length);
- buffio_set_input_callbacks(b);
+ do_read(b);
}
-void buffio_set_output_watermark(struct buffio *b, ssize_t w) {
+void buffio_set_output_range(struct buffio *b, ssize_t w) {
assert(b);
if (w < 0)
- b->output_watermark = b->output_max_length;
+ b->output_range = b->output_max_length;
else
- b->output_watermark = w;
+ b->output_range = w;
- assert(b->output_watermark > 0 && b->output_watermark <= b->output_max_length);
- buffio_set_output_callbacks(b);
+ assert(b->output_range > 0 && b->output_range <= b->output_max_length);
+
+ do_write(b);
}
const uint8_t* buffio_read_ptr(struct buffio *b, size_t *l) {
assert(b && l);
+ if (!b->input_length)
+ return NULL;
+
*l = b->input_length;
if (*l > b->input_max_length - b->input_index)
@@ -553,7 +552,6 @@ void buffio_read_ptr_inc(struct buffio *b, size_t l) {
b->input_index -= b->input_max_length;
b->input_length -= l;
-
buffio_normalize(b);
do_read(b);
@@ -563,9 +561,12 @@ uint8_t* buffio_write_ptr(struct buffio *b, size_t *l) {
size_t j;
assert(b && l);
+ if (b->output_length >= b->output_range)
+ return NULL;
+
j = (b->output_index + b->output_length) % b->output_max_length;
- *l = b->output_max_length - b->output_length;
+ *l = b->output_range - b->output_length;
if (*l > b->output_max_length - j)
*l = b->output_max_length - j;
@@ -583,7 +584,6 @@ void buffio_write_ptr_inc(struct buffio *b, size_t l) {
assert(l <= b->output_max_length - b->output_length && l <= b->output_max_length - j);
b->output_length += l;
-
buffio_normalize(b);
do_write(b);
@@ -595,6 +595,12 @@ int buffio_output_is_empty(struct buffio *b) {
return b->output_length == 0;
}
+int buffio_input_is_full(struct buffio *b) {
+ assert(b);
+
+ return b->input_length >= b->input_range;
+}
+
void buffio_dump_lines(struct buffio *b) {
int r, i;
assert(b);
@@ -608,9 +614,8 @@ void buffio_dump_lines(struct buffio *b) {
if (r > 0) {
b->input_index = (b->input_index+r) % b->input_max_length;
b->input_length -= r;
-
buffio_normalize(b);
-
+
do_read(b);
}
}
@@ -618,20 +623,31 @@ void buffio_dump_lines(struct buffio *b) {
void buffio_set_fds(struct buffio *b, int ifd, int ofd) {
assert(b && b->ifd == -1 && b->ofd == -1);
- b->ifd = ifd;
- b->ofd = ofd;
+ daemon_nonblock(b->ifd = ifd, 1);
+ daemon_nonblock(b->ofd = ofd, 1);
- daemon_nonblock(b->ifd, 1);
- daemon_nonblock(b->ofd, 1);
+ buffio_set_readable(b, 0);
+ buffio_set_writable(b, 0);
+}
- b->readable = b->writable = 0;
+int buffio_can_write(struct buffio *b, size_t l) {
+ assert(b);
+
+ if (b->output_length >= b->output_range)
+ return 0;
- buffio_set_input_callbacks(b);
- buffio_set_output_callbacks(b);
+ return l <= b->output_range - b->output_length;
}
-int buffio_can_write(struct buffio *b, size_t l) {
+int buffio_write_req(struct buffio *b) {
assert(b);
- return l <= b->output_max_length - b->output_length;
+ return b->output_length < b->output_range;
+}
+
+void buffio_set_prebuf(struct buffio *b, int p) {
+ assert(b);
+
+ b->prebuf = p;
+ do_write(b);
}