summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2004-06-08 23:54:24 +0000
committerLennart Poettering <lennart@poettering.net>2004-06-08 23:54:24 +0000
commit9cb0b933e260008c6a03e24a4a149f726b8d86b2 (patch)
treeb54651bafe32d1a817e779f884d1628176465bf0 /src
parentb1c00dcd0ae51d201f772e7f5fa61acae436a2cf (diff)
initial commit
git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@3 fefdeb5f-60dc-0310-8127-8f9354f1896f
Diffstat (limited to 'src')
-rw-r--r--src/Makefile10
-rw-r--r--src/client.c32
-rw-r--r--src/client.h21
-rw-r--r--src/core.c81
-rw-r--r--src/core.h21
-rw-r--r--src/idxset.c329
-rw-r--r--src/idxset.h28
-rw-r--r--src/inputstream.c50
-rw-r--r--src/inputstream.h25
-rw-r--r--src/iochannel.c158
-rw-r--r--src/iochannel.h20
-rw-r--r--src/main.c26
-rw-r--r--src/mainloop.c331
-rw-r--r--src/mainloop.h38
-rw-r--r--src/memblock.c67
-rw-r--r--src/memblock.h31
-rw-r--r--src/memblockq.c156
-rw-r--r--src/memblockq.h24
-rw-r--r--src/module.c98
-rw-r--r--src/module.h27
-rw-r--r--src/oss.c30
-rw-r--r--src/outputstream.c41
-rw-r--r--src/outputstream.h22
-rw-r--r--src/packet.c29
-rw-r--r--src/packet.h18
-rw-r--r--src/protocol-native-tcp.c19
-rw-r--r--src/protocol-native-unix.c27
-rw-r--r--src/protocol-native.c49
-rw-r--r--src/protocol-native.h9
-rw-r--r--src/protocol-simple-tcp.c24
-rw-r--r--src/protocol-simple.c173
-rw-r--r--src/protocol-simple.h17
-rw-r--r--src/pstream.c359
-rw-r--r--src/pstream.h22
-rw-r--r--src/queue.c77
-rw-r--r--src/queue.h13
-rw-r--r--src/sample.c80
-rw-r--r--src/sample.h35
-rw-r--r--src/sink-pipe.c155
-rw-r--r--src/sink.c217
-rw-r--r--src/sink.h38
-rw-r--r--src/socket-server.c157
-rw-r--r--src/socket-server.h18
-rw-r--r--src/source.c58
-rw-r--r--src/source.h30
-rw-r--r--src/strbuf.c122
-rw-r--r--src/strbuf.h13
47 files changed, 3425 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile
new file mode 100644
index 00000000..366e84e6
--- /dev/null
+++ b/src/Makefile
@@ -0,0 +1,10 @@
+CFLAGS=-Wall -pipe -ansi -D_GNU_SOURCE
+
+all: idxset.o queue.o strbuf.o mainloop.o iochannel.o packet.o \
+ memblock.o sample.o socket-server.o memblockq.o client.o \
+ core.o main.o outputstream.o inputstream.o source.o sink.o \
+ pstream.o protocol-simple.o protocol-simple-tcp.o sink-pipe.o \
+ module.o
+
+clean:
+ rm -f *.o
diff --git a/src/client.c b/src/client.c
new file mode 100644
index 00000000..56d85734
--- /dev/null
+++ b/src/client.c
@@ -0,0 +1,32 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "client.h"
+
+struct client *client_new(struct core *core, const char *protocol_name, char *name) {
+ struct client *c;
+ int r;
+ assert(core);
+
+ c = malloc(sizeof(struct client));
+ assert(c);
+ c->protocol_name = protocol_name;
+ c->name = name ? strdup(name) : NULL;
+ c->kill = NULL;
+ c->userdata = NULL;
+ c->core = core;
+
+ r = idxset_put(core->clients, c, &c->index);
+ assert(c->index != IDXSET_INVALID && r >= 0);
+
+ return c;
+}
+
+void client_free(struct client *c) {
+ assert(c && c->core);
+
+ idxset_remove_by_data(c->core->clients, c, NULL);
+ free(c->name);
+ free(c);
+}
diff --git a/src/client.h b/src/client.h
new file mode 100644
index 00000000..7128a452
--- /dev/null
+++ b/src/client.h
@@ -0,0 +1,21 @@
+#ifndef fooclienthfoo
+#define fooclienthfoo
+
+#include "core.h"
+
+struct client {
+ char *name;
+ uint32_t index;
+
+ const char *protocol_name;
+
+ void *userdata;
+ void (*kill)(struct client *c);
+
+ struct core *core;
+};
+
+struct client *client_new(struct core *c, const char *protocol_name, char *name);
+void client_free(struct client *c);
+
+#endif
diff --git a/src/core.c b/src/core.c
new file mode 100644
index 00000000..7cfa66e3
--- /dev/null
+++ b/src/core.c
@@ -0,0 +1,81 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <stdio.h>
+
+#include "core.h"
+#include "module.h"
+#include "sink.h"
+#include "source.h"
+
+struct core* core_new(struct mainloop *m) {
+ struct core* c;
+ c = malloc(sizeof(struct core));
+ assert(c);
+
+ c->mainloop = m;
+ c->clients = idxset_new(NULL, NULL);
+ c->sinks = idxset_new(NULL, NULL);
+ c->sources = idxset_new(NULL, NULL);
+ c->output_streams = idxset_new(NULL, NULL);
+ c->input_streams = idxset_new(NULL, NULL);
+
+ c->default_source_index = c->default_sink_index = IDXSET_INVALID;
+
+ c->modules = NULL;
+
+ return c;
+};
+
+void core_free(struct core *c) {
+ assert(c);
+
+ module_unload_all(c);
+ assert(!c->modules);
+
+ assert(idxset_isempty(c->clients));
+ idxset_free(c->clients, NULL, NULL);
+
+ assert(idxset_isempty(c->sinks));
+ idxset_free(c->sinks, NULL, NULL);
+
+ assert(idxset_isempty(c->sources));
+ idxset_free(c->sources, NULL, NULL);
+
+ assert(idxset_isempty(c->output_streams));
+ idxset_free(c->output_streams, NULL, NULL);
+
+ assert(idxset_isempty(c->input_streams));
+ idxset_free(c->input_streams, NULL, NULL);
+
+ free(c);
+};
+
+struct sink* core_get_default_sink(struct core *c) {
+ struct sink *sink;
+ assert(c);
+
+ if ((sink = idxset_get_by_index(c->sinks, c->default_sink_index)))
+ return sink;
+
+ if (!(sink = idxset_rrobin(c->sinks, NULL)))
+ return NULL;
+
+ fprintf(stderr, "Default sink vanished, setting to %u\n", sink->index);
+ c->default_sink_index = sink->index;
+ return sink;
+}
+
+struct source* core_get_default_source(struct core *c) {
+ struct source *source;
+ assert(c);
+
+ if ((source = idxset_get_by_index(c->sources, c->default_source_index)))
+ return source;
+
+ if (!(source = idxset_rrobin(c->sources, NULL)))
+ return NULL;
+
+ fprintf(stderr, "Default source vanished, setting to %u\n", source->index);
+ c->default_source_index = source->index;
+ return source;
+}
diff --git a/src/core.h b/src/core.h
new file mode 100644
index 00000000..649c9dba
--- /dev/null
+++ b/src/core.h
@@ -0,0 +1,21 @@
+#ifndef foocorehfoo
+#define foocorehfoo
+
+#include "idxset.h"
+#include "mainloop.h"
+
+struct core {
+ struct mainloop *mainloop;
+
+ struct idxset *clients, *sinks, *sources, *output_streams, *input_streams, *modules;
+
+ uint32_t default_source_index, default_sink_index;
+};
+
+struct core* core_new(struct mainloop *m);
+void core_free(struct core*c);
+
+struct sink* core_get_default_sink(struct core *c);
+struct source* core_get_default_source(struct core *c);
+
+#endif
diff --git a/src/idxset.c b/src/idxset.c
new file mode 100644
index 00000000..eaea34f4
--- /dev/null
+++ b/src/idxset.c
@@ -0,0 +1,329 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "idxset.h"
+
+struct idxset_entry {
+ void *data;
+ uint32_t index;
+ unsigned hash_value;
+
+ struct idxset_entry *hash_prev, *hash_next;
+ struct idxset_entry* iterate_prev, *iterate_next;
+};
+
+struct idxset {
+ unsigned (*hash_func) (void *p);
+ int (*compare_func)(void *a, void *b);
+
+ unsigned hash_table_size, n_entries;
+ struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail, *rrobin;
+ uint32_t index, start_index, array_size;
+};
+
+static unsigned trivial_hash_func(void *p) {
+ return (unsigned) p;
+}
+
+static int trivial_compare_func(void *a, void *b) {
+ return !(a == b);
+}
+
+struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) (void*a, void*b)) {
+ struct idxset *s;
+
+ s = malloc(sizeof(struct idxset));
+ assert(s);
+ s->hash_func = hash_func ? hash_func : trivial_hash_func;
+ s->compare_func = compare_func ? compare_func : trivial_compare_func;
+ s->hash_table_size = 1023;
+ s->hash_table = malloc(sizeof(struct idxset_entry*)*s->hash_table_size);
+ assert(s->hash_table);
+ s->array = NULL;
+ s->array_size = 0;
+ s->index = 0;
+ s->start_index = 0;
+ s->n_entries = 0;
+
+ s->iterate_list_head = s->iterate_list_tail = NULL;
+
+ return s;
+}
+
+void idxset_free(struct idxset *s, void (*free_func) (void *p, void *userdata), void *userdata) {
+ assert(s);
+
+ if (free_func) {
+ while (s->iterate_list_head) {
+ struct idxset_entry *e = s->iterate_list_head;
+ s->iterate_list_head = s->iterate_list_head->iterate_next;
+
+ if (free_func)
+ free_func(e->data, userdata);
+ free(e);
+ }
+ }
+
+ free(s->hash_table);
+ free(s->array);
+ free(s);
+}
+
+static struct idxset_entry* hash_scan(struct idxset *s, struct idxset_entry* e, void *p) {
+ assert(p);
+
+ assert(s->compare_func);
+ for (; e; e = e->hash_next)
+ if (s->compare_func(e->data, p))
+ return e;
+
+ return NULL;
+}
+
+static void extend_array(struct idxset *s, uint32_t index) {
+ uint32_t i, j, l;
+ struct idxset_entry** n;
+ assert(index >= s->start_index );
+
+ if (index <= s->start_index + s->array_size)
+ return;
+
+ for (i = 0; i < s->array_size; i++)
+ if (s->array[i])
+ break;
+
+ l = index - s->start_index - i + 100;
+ n = malloc(sizeof(struct hash_table_entry*)*l);
+ assert(n);
+ memset(n, 0, sizeof(struct hash_table_entry*)*l);
+
+ for (j = 0; j < s->array_size-i; j++)
+ n[j] = s->array[i+j];
+
+ free(s->array);
+
+ s->array = n;
+ s->array_size = l;
+ s->start_index += i;
+}
+
+static struct idxset_entry** array_index(struct idxset*s, uint32_t index) {
+
+ if (index >= s->start_index + s->array_size)
+ return NULL;
+
+ if (index < s->start_index)
+ return NULL;
+
+ return s->array + (index - s->start_index);
+}
+
+int idxset_put(struct idxset*s, void *p, uint32_t *index) {
+ unsigned h;
+ struct idxset_entry *e, **a;
+ assert(s && p);
+
+ assert(s->hash_func);
+ h = s->hash_func(p) % s->hash_table_size;
+
+ assert(s->hash_table);
+ if ((e = hash_scan(s, s->hash_table[h], p))) {
+ if (index)
+ *index = e->index;
+
+ return -1;
+ }
+
+ e = malloc(sizeof(struct idxset_entry));
+ assert(e);
+
+ e->data = p;
+ e->index = s->index++;
+ e->hash_value = h;
+
+ /* Insert into hash table */
+ e->hash_next = s->hash_table[h];
+ e->hash_prev = NULL;
+ if (s->hash_table[h])
+ s->hash_table[h]->hash_prev = e;
+ s->hash_table[h] = e;
+
+ /* Insert into array */
+ extend_array(s, s->index);
+ a = array_index(s, s->index);
+ assert(a && !*a);
+ *a = e;
+
+ /* Insert into linked list */
+ e->iterate_next = NULL;
+ e->iterate_prev = s->iterate_list_tail;
+ if (s->iterate_list_tail) {
+ assert(s->iterate_list_head);
+ s->iterate_list_tail->iterate_next = e;
+ } else {
+ assert(!s->iterate_list_head);
+ s->iterate_list_head = e;
+ }
+ s->iterate_list_tail = e;
+
+ s->n_entries++;
+ assert(s->n_entries >= 1);
+
+ if (index)
+ *index = e->index;
+
+ return 0;
+}
+
+void* idxset_get_by_index(struct idxset*s, uint32_t index) {
+ struct idxset_entry **a;
+ assert(s);
+
+ if (!(a = array_index(s, index)))
+ return NULL;
+
+ return (*a)->data;
+}
+
+void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index) {
+ unsigned h;
+ struct idxset_entry *e;
+ assert(s && p);
+
+ assert(s->hash_func);
+ h = s->hash_func(p) % s->hash_table_size;
+
+ assert(s->hash_table);
+ if (!(e = hash_scan(s, s->hash_table[h], p)))
+ return NULL;
+
+ if (index)
+ *index = e->index;
+
+ return e->data;
+}
+
+static void remove_entry(struct idxset *s, struct idxset_entry *e) {
+ struct idxset_entry **a;
+ assert(s && e);
+
+ /* Remove from array */
+ a = array_index(s, s->index);
+ assert(a && *a == e);
+ *a = NULL;
+
+ /* Remove from linked list */
+ if (e->iterate_next)
+ e->iterate_next->iterate_prev = e->iterate_prev;
+ else
+ s->iterate_list_tail = e->iterate_prev;
+
+ if (e->iterate_prev)
+ e->iterate_prev->iterate_next = e->iterate_next;
+ else
+ s->iterate_list_head = e->iterate_next;
+
+ /* Remove from hash table */
+ if (e->hash_next)
+ e->hash_next->hash_prev = e->hash_prev;
+
+ if (e->hash_prev)
+ e->hash_prev->hash_next = e->hash_next;
+ else
+ s->hash_table[e->hash_value] = e->hash_next;
+
+ if (s->rrobin == e)
+ s->rrobin = NULL;
+
+ free(e);
+
+ assert(s->n_entries >= 1);
+ s->n_entries--;
+}
+
+void* idxset_remove_by_index(struct idxset*s, uint32_t index) {
+ struct idxset_entry **a;
+ void *data;
+
+ assert(s);
+
+ if (!(a = array_index(s, index)))
+ return NULL;
+
+ data = (*a)->data;
+ remove_entry(s, *a);
+
+ return data;
+}
+
+void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) {
+ struct idxset_entry *e;
+ unsigned h;
+
+ assert(s->hash_func);
+ h = s->hash_func(data) % s->hash_table_size;
+
+ assert(s->hash_table);
+ if (!(e = hash_scan(s, s->hash_table[h], data)))
+ return NULL;
+
+ data = e->data;
+ if (index)
+ *index = e->index;
+
+ remove_entry(s, e);
+
+ return data;
+}
+
+void* idxset_rrobin(struct idxset *s, uint32_t *index) {
+ assert(s && index);
+
+ if (s->rrobin)
+ s->rrobin = s->rrobin->iterate_next;
+
+ if (!s->rrobin)
+ s->rrobin = s->iterate_list_head;
+
+ if (!s->rrobin)
+ return NULL;
+
+ if (index)
+ *index = s->rrobin->index;
+
+ return s->rrobin->data;
+}
+
+int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata) {
+ struct idxset_entry *e;
+ assert(s && func);
+
+ e = s->iterate_list_head;
+ while (e) {
+ int del = 0, r;
+ struct idxset_entry *n = e->iterate_next;
+
+ r = func(e->data, e->index, &del, userdata);
+
+ if (del)
+ remove_entry(s, e);
+
+ if (r < 0)
+ return r;
+
+ e = n;
+ }
+
+ return 0;
+}
+
+unsigned idxset_ncontents(struct idxset*s) {
+ assert(s);
+ return s->n_entries;
+}
+
+int idxset_isempty(struct idxset *s) {
+ assert(s);
+ return s->n_entries == 0;
+}
diff --git a/src/idxset.h b/src/idxset.h
new file mode 100644
index 00000000..f649e23e
--- /dev/null
+++ b/src/idxset.h
@@ -0,0 +1,28 @@
+#ifndef fooidxsethfoo
+#define fooidxsethfoo
+
+#include <inttypes.h>
+
+#define IDXSET_INVALID ((uint32_t) -1)
+
+struct idxset;
+
+struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) (void*a, void*b));
+void idxset_free(struct idxset *s, void (*free_func) (void *p, void *userdata), void *userdata);
+
+int idxset_put(struct idxset*s, void *p, uint32_t *index);
+
+void* idxset_get_by_index(struct idxset*s, uint32_t index);
+void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index);
+
+void* idxset_remove_by_index(struct idxset*s, uint32_t index);
+void* idxset_remove_by_data(struct idxset*s, void *p, uint32_t *index);
+
+void* idxset_rrobin(struct idxset *s, uint32_t *index);
+
+int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata);
+
+unsigned idxset_ncontents(struct idxset*s);
+int idxset_isempty(struct idxset *s);
+
+#endif
diff --git a/src/inputstream.c b/src/inputstream.c
new file mode 100644
index 00000000..c7b4b4c7
--- /dev/null
+++ b/src/inputstream.c
@@ -0,0 +1,50 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "inputstream.h"
+
+struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name) {
+ struct input_stream *i;
+ int r;
+ assert(s && spec);
+
+ i = malloc(sizeof(struct input_stream));
+ assert(i);
+ i->name = name ? strdup(name) : NULL;
+ i->sink = s;
+ i->spec = *spec;
+
+ i->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
+ assert(i->memblockq);
+
+ assert(s->core);
+ r = idxset_put(s->core->input_streams, i, &i->index);
+ assert(r == 0 && i->index != IDXSET_INVALID);
+ r = idxset_put(s->input_streams, i, NULL);
+ assert(r == 0);
+
+ return i;
+}
+
+void input_stream_free(struct input_stream* i) {
+ assert(i);
+
+ memblockq_free(i->memblockq);
+
+ assert(i->sink && i->sink->core);
+ idxset_remove_by_data(i->sink->core->input_streams, i, NULL);
+ idxset_remove_by_data(i->sink->input_streams, i, NULL);
+
+ free(i->name);
+ free(i);
+}
+
+void input_stream_notify(struct input_stream *i) {
+ assert(i);
+
+ if (memblockq_is_empty(i->memblockq))
+ return;
+
+ sink_notify(i->sink);
+}
diff --git a/src/inputstream.h b/src/inputstream.h
new file mode 100644
index 00000000..0353799e
--- /dev/null
+++ b/src/inputstream.h
@@ -0,0 +1,25 @@
+#ifndef fooinputstreamhfoo
+#define fooinputstreamhfoo
+
+#include <inttypes.h>
+
+#include "sink.h"
+#include "sample.h"
+#include "memblockq.h"
+
+struct input_stream {
+ char *name;
+ uint32_t index;
+
+ struct sink *sink;
+ struct sample_spec spec;
+
+ struct memblockq *memblockq;
+};
+
+struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name);
+void input_stream_free(struct input_stream* i);
+
+void input_stream_notify(struct input_stream *i);
+
+#endif
diff --git a/src/iochannel.c b/src/iochannel.c
new file mode 100644
index 00000000..db9717a9
--- /dev/null
+++ b/src/iochannel.c
@@ -0,0 +1,158 @@
+#include <assert.h>
+#include <fcntl.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "iochannel.h"
+
+struct iochannel {
+ int ifd, ofd;
+ struct mainloop* mainloop;
+
+ void (*callback)(struct iochannel*io, void *userdata);
+ void*userdata;
+
+ int readable;
+ int writable;
+
+ struct mainloop_source* input_source, *output_source;
+};
+
+static void enable_mainloop_sources(struct iochannel *io) {
+ assert(io);
+
+ if (io->input_source == io->output_source) {
+ enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL;
+ assert(io->input_source);
+
+ if (!io->readable)
+ e |= MAINLOOP_IO_EVENT_IN;
+ if (!io->writable)
+ e |= MAINLOOP_IO_EVENT_OUT;
+
+ mainloop_source_io_set_events(io->input_source, e);
+ } else {
+ if (io->input_source)
+ mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN);
+ if (io->output_source)
+ mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT);
+ }
+}
+
+static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) {
+ struct iochannel *io = userdata;
+ int changed;
+ assert(s && fd >= 0 && userdata);
+
+ if (events & MAINLOOP_IO_EVENT_IN && !io->readable) {
+ io->readable = 1;
+ changed = 1;
+ }
+
+ if (events & MAINLOOP_IO_EVENT_OUT && !io->writable) {
+ io->writable = 1;
+ changed = 1;
+ }
+
+ if (changed) {
+ enable_mainloop_sources(io);
+
+ if (io->callback)
+ io->callback(io, io->userdata);
+ }
+}
+
+static void make_nonblock_fd(int fd) {
+ int v;
+
+ if ((v = fcntl(fd, F_GETFL)) >= 0)
+ if (!(v & O_NONBLOCK))
+ fcntl(fd, F_SETFL, v|O_NONBLOCK);
+}
+
+struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) {
+ struct iochannel *io;
+ assert(m && (ifd >= 0 || ofd >= 0));
+
+ io = malloc(sizeof(struct iochannel));
+ io->ifd = ifd;
+ io->ofd = ofd;
+ io->mainloop = m;
+
+ io->userdata = NULL;
+ io->callback = NULL;
+ io->readable = 0;
+ io->writable = 0;
+
+ if (ifd == ofd) {
+ assert(ifd >= 0);
+ make_nonblock_fd(io->ifd);
+ io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io);
+ } else {
+
+ if (ifd >= 0) {
+ make_nonblock_fd(io->ifd);
+ io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io);
+ } else
+ io->input_source = NULL;
+
+ if (ofd >= 0) {
+ make_nonblock_fd(io->ofd);
+ io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io);
+ } else
+ io->output_source = NULL;
+ }
+
+ return io;
+}
+
+void iochannel_free(struct iochannel*io) {
+ assert(io);
+
+ if (io->ifd >= 0)
+ close(io->ifd);
+ if (io->ofd >= 0 && io->ofd != io->ifd)
+ close(io->ofd);
+
+ if (io->input_source)
+ mainloop_source_free(io->input_source);
+ if (io->output_source)
+ mainloop_source_free(io->output_source);
+
+ free(io);
+}
+
+int iochannel_is_readable(struct iochannel*io) {
+ assert(io);
+ return io->readable;
+}
+
+int iochannel_is_writable(struct iochannel*io) {
+ assert(io);
+ return io->writable;
+}
+
+ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l) {
+ ssize_t r;
+ assert(io && data && l && io->ofd >= 0);
+
+ if ((r = write(io->ofd, data, l)) >= 0) {
+ io->writable = 0;
+ enable_mainloop_sources(io);
+ }
+
+ return r;
+}
+
+ssize_t iochannel_read(struct iochannel*io, void*data, size_t l) {
+ ssize_t r;
+
+ assert(io && data && l && io->ifd >= 0);
+
+ if ((r = read(io->ifd, data, l)) >= 0) {
+ io->readable = 0;
+ enable_mainloop_sources(io);
+ }
+
+ return r;
+}
diff --git a/src/iochannel.h b/src/iochannel.h
new file mode 100644
index 00000000..f97fabba
--- /dev/null
+++ b/src/iochannel.h
@@ -0,0 +1,20 @@
+#ifndef fooiochannelhfoo
+#define fooiochannelhfoo
+
+#include <sys/types.h>
+#include "mainloop.h"
+
+struct iochannel;
+
+struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd);
+void iochannel_free(struct iochannel*io);
+
+ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l);
+ssize_t iochannel_read(struct iochannel*io, void*data, size_t l);
+
+int iochannel_is_readable(struct iochannel*io);
+int iochannel_is_writable(struct iochannel*io);
+
+void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata);
+
+#endif
diff --git a/src/main.c b/src/main.c
new file mode 100644
index 00000000..3104c264
--- /dev/null
+++ b/src/main.c
@@ -0,0 +1,26 @@
+#include <stddef.h>
+#include <assert.h>
+
+#include "core.h"
+#include "mainloop.h"
+#include "module.h"
+
+int main(int argc, char *argv[]) {
+ struct mainloop *m;
+ struct core *c;
+
+ m = mainloop_new();
+ assert(m);
+ c = core_new(m);
+ assert(c);
+
+ module_load(c, "sink-pipe", NULL);
+ module_load(c, "protocol-simple-tcp", NULL);
+
+ mainloop_run(m);
+
+ core_free(c);
+ mainloop_free(m);
+
+ return 0;
+}
diff --git a/src/mainloop.c b/src/mainloop.c
new file mode 100644
index 00000000..d043ce90
--- /dev/null
+++ b/src/mainloop.c
@@ -0,0 +1,331 @@
+#include <sys/poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include "mainloop.h"
+
+struct mainloop_source {
+ struct mainloop_source *next;
+ struct mainloop *mainloop;
+ enum mainloop_source_type type;
+
+ int enabled;
+ int dead;
+ void *userdata;
+
+ struct {
+ int fd;
+ enum mainloop_io_event events;
+ void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata);
+ struct pollfd pollfd;
+ } io;
+
+ struct {
+ void (*callback)(struct mainloop_source*s, void *userdata);
+ } prepare;
+
+ struct {
+ void (*callback)(struct mainloop_source*s, void *userdata);
+ } idle;
+};
+
+struct mainloop_source_list {
+ struct mainloop_source *sources;
+ int n_sources;
+ int dead_sources;
+};
+
+struct mainloop {
+ struct mainloop_source_list io_sources, prepare_sources, idle_sources;
+
+ struct pollfd *pollfds;
+ int max_pollfds, n_pollfds;
+ int rebuild_pollfds;
+
+ int quit;
+ int running;
+};
+
+struct mainloop *mainloop_new(void) {
+ struct mainloop *m;
+
+ m = malloc(sizeof(struct mainloop));
+ assert(m);
+ memset(m, 0, sizeof(struct mainloop));
+
+ return m;
+}
+
+static void free_sources(struct mainloop_source_list *l, int all) {
+ struct mainloop_source *s, *p;
+ assert(l);
+
+ if (!l->dead_sources)
+ return;
+
+ p = NULL;
+ s = l->sources;
+ while (s) {
+ if (all || s->dead) {
+ struct mainloop_source *t = s;
+ s = s->next;
+
+ if (p)
+ p->next = s;
+ else
+ l->sources = s;
+
+ free(t);
+ } else {
+ p = s;
+ s = s->next;
+ }
+ }
+
+ l->dead_sources = 0;
+
+ if (all) {
+ assert(l->sources);
+ l->n_sources = 0;
+ }
+}
+
+void mainloop_free(struct mainloop* m) {
+ assert(m);
+ free_sources(&m->io_sources, 1);
+ free_sources(&m->prepare_sources, 1);
+ free_sources(&m->idle_sources, 1);
+ free(m->pollfds);
+}
+
+static void rebuild_pollfds(struct mainloop *m) {
+ struct mainloop_source*s;
+ struct pollfd *p;
+
+ if (m->max_pollfds < m->io_sources.n_sources) {
+ m->max_pollfds = m->io_sources.n_sources*2;
+ m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds);
+ }
+
+ m->n_pollfds = 0;
+ p = m->pollfds;
+ for (s = m->io_sources.sources; s; s = s->next) {
+ assert(s->type == MAINLOOP_SOURCE_TYPE_IO);
+ if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) {
+ *(p++) = s->io.pollfd;
+ m->n_pollfds++;
+ }
+ }
+}
+
+static void dispatch_pollfds(struct mainloop *m) {
+ int i;
+ struct pollfd *p;
+ struct mainloop_source *s;
+ /* This loop assumes that m->sources and m->pollfds have the same
+ * order and that m->pollfds is a subset of m->sources! */
+
+ s = m->io_sources.sources;
+ for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) {
+ for (;;) {
+ assert(s && s->type == MAINLOOP_SOURCE_TYPE_IO);
+
+ if (p->fd == s->io.fd) {
+ if (!s->dead && s->enabled) {
+ enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0);
+ if (e) {
+ assert(s->io.callback);
+ s->io.callback(s, s->io.fd, e, s->userdata);
+ }
+ }
+
+ break;
+ }
+ s = s->next;
+ }
+ }
+}
+
+int mainloop_iterate(struct mainloop *m, int block) {
+ struct mainloop_source *s;
+ int c;
+ assert(m && !m->running);
+
+ if(m->quit)
+ return m->quit;
+
+ free_sources(&m->io_sources, 0);
+ free_sources(&m->prepare_sources, 0);
+ free_sources(&m->idle_sources, 0);
+
+ for (s = m->prepare_sources.sources; s; s = s->next) {
+ assert(!s->dead && s->type == MAINLOOP_SOURCE_TYPE_PREPARE);
+ if (s->enabled) {
+ assert(s->prepare.callback);
+ s->prepare.callback(s, s->userdata);
+ }
+ }
+
+ if (m->rebuild_pollfds)
+ rebuild_pollfds(m);
+
+ m->running = 1;
+
+ if ((c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0)) > 0)
+ dispatch_pollfds(m);
+ else if (c == 0) {
+ for (s = m->idle_sources.sources; s; s = s->next) {
+ assert(!s->dead && s->type == MAINLOOP_SOURCE_TYPE_IDLE);
+ if (s->enabled) {
+ assert(s->idle.callback);
+ s->idle.callback(s, s->userdata);
+ }
+ }
+ }
+
+ m->running = 0;
+ return c < 0 ? -1 : 0;
+}
+
+int mainloop_run(struct mainloop *m) {
+ int r;
+ while (!(r = mainloop_iterate(m, 1)));
+ return r;
+}
+
+void mainloop_quit(struct mainloop *m, int r) {
+ assert(m);
+ m->quit = r;
+}
+
+static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) {
+ struct mainloop_source_list *l;
+
+ switch(type) {
+ case MAINLOOP_SOURCE_TYPE_IO:
+ l = &m->io_sources;
+ break;
+ case MAINLOOP_SOURCE_TYPE_PREPARE:
+ l = &m->prepare_sources;
+ break;
+ case MAINLOOP_SOURCE_TYPE_IDLE:
+ l = &m->idle_sources;
+ break;
+ default:
+ l = NULL;
+ break;
+ }
+
+ return l;
+}
+
+static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) {
+ struct mainloop_source_list *l;
+ struct mainloop_source* s;
+ assert(m);
+
+ s = malloc(sizeof(struct mainloop_source));
+ assert(s);
+ memset(s, 0, sizeof(struct mainloop_source));
+
+ s->type = type;
+ s->mainloop = m;
+
+ l = get_source_list(m, type);
+ assert(l);
+
+ s->next = l->sources;
+ l->sources = s;
+ l->n_sources++;
+ return s;
+}
+
+struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) {
+ struct mainloop_source* s;
+ assert(m && fd>=0 && callback);
+
+ s = source_new(m, MAINLOOP_SOURCE_TYPE_IO);
+
+ s->io.fd = fd;
+ s->io.events = event;
+ s->io.callback = callback;
+ s->userdata = userdata;
+ s->io.pollfd.fd = fd;
+ s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0);
+ s->io.pollfd.revents = 0;
+
+ s->enabled = 1;
+
+ m->rebuild_pollfds = 1;
+ return s;
+}
+
+struct mainloop_source* mainloop_source_new_prepare(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
+ struct mainloop_source* s;
+ assert(m && callback);
+
+ s = source_new(m, MAINLOOP_SOURCE_TYPE_PREPARE);
+
+ s->prepare.callback = callback;
+ s->userdata = userdata;
+ s->enabled = 1;
+ return s;
+}
+
+struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) {
+ struct mainloop_source* s;
+ assert(m && callback);
+
+ s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE);
+
+ s->prepare.callback = callback;
+ s->userdata = userdata;
+ s->enabled = 1;
+ return s;
+}
+
+void mainloop_source_free(struct mainloop_source*s) {
+ struct mainloop_source_list *l;
+ assert(s && !s->dead);
+ s->dead = 1;
+
+ assert(s->mainloop);
+ l = get_source_list(s->mainloop, s->type);
+ assert(l);
+
+ l->n_sources--;
+ l->dead_sources = 1;
+
+ if (s->type == MAINLOOP_SOURCE_TYPE_IO)
+ s->mainloop->rebuild_pollfds = 1;
+}
+
+void mainloop_source_enable(struct mainloop_source*s, int b) {
+ assert(s && !s->dead);
+
+ if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) {
+ assert(s->mainloop);
+ s->mainloop->rebuild_pollfds = 1;
+ }
+
+ s->enabled = b;
+}
+
+void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) {
+ assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO);
+
+ if ((s->io.events && !events) || (!s->io.events && events)) {
+ assert(s->mainloop);
+ s->mainloop->rebuild_pollfds = 1;
+ }
+
+ s->io.events = events;
+ s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0);
+}
+
+struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) {
+ assert(s);
+
+ return s->mainloop;
+}
diff --git a/src/mainloop.h b/src/mainloop.h
new file mode 100644
index 00000000..72376c72
--- /dev/null
+++ b/src/mainloop.h
@@ -0,0 +1,38 @@
+#ifndef foomainloophfoo
+#define foomainloophfoo
+
+struct mainloop;
+struct mainloop_source;
+
+enum mainloop_io_event {
+ MAINLOOP_IO_EVENT_NULL = 0,
+ MAINLOOP_IO_EVENT_IN = 1,
+ MAINLOOP_IO_EVENT_OUT = 2,
+ MAINLOOP_IO_EVENT_BOTH = 3
+};
+
+enum mainloop_source_type {
+ MAINLOOP_SOURCE_TYPE_IO,
+ MAINLOOP_SOURCE_TYPE_PREPARE,
+ MAINLOOP_SOURCE_TYPE_IDLE
+};
+
+struct mainloop *mainloop_new(void);
+void mainloop_free(struct mainloop* m);
+
+int mainloop_iterate(struct mainloop *m, int block);
+int mainloop_run(struct mainloop *m);
+void mainloop_quit(struct mainloop *m, int r);
+
+struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata);
+struct mainloop_source* mainloop_source_new_prepare(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
+struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata);
+
+void mainloop_source_free(struct mainloop_source*s);
+void mainloop_source_enable(struct mainloop_source*s, int b);
+
+void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event);
+
+struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s);
+
+#endif
diff --git a/src/memblock.c b/src/memblock.c
new file mode 100644
index 00000000..3bef4944
--- /dev/null
+++ b/src/memblock.c
@@ -0,0 +1,67 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+
+#include "memblock.h"
+
+struct memblock *memblock_new(size_t length) {
+ struct memblock *b = malloc(sizeof(struct memblock)+length);
+ b->type = MEMBLOCK_APPENDED;
+ b->ref = 1;
+ b->length = length;
+ b->data = b+1;
+ return b;
+}
+
+struct memblock *memblock_new_fixed(void *d, size_t length) {
+ struct memblock *b = malloc(sizeof(struct memblock));
+ b->type = MEMBLOCK_FIXED;
+ b->ref = 1;
+ b->length = length;
+ b->data = d;
+ return b;
+}
+
+struct memblock *memblock_new_dynamic(void *d, size_t length) {
+ struct memblock *b = malloc(sizeof(struct memblock));
+ b->type = MEMBLOCK_DYNAMIC;
+ b->ref = 1;
+ b->length = length;
+ b->data = d;
+ return b;
+}
+
+struct memblock* memblock_ref(struct memblock*b) {
+ assert(b && b->ref >= 1);
+ b->ref++;
+ return b;
+}
+
+void memblock_unref(struct memblock*b) {
+ assert(b && b->ref >= 1);
+ b->ref--;
+
+ if (b->ref == 0) {
+ if (b->type == MEMBLOCK_DYNAMIC)
+ free(b->data);
+ free(b);
+ }
+}
+
+void memblock_unref_fixed(struct memblock *b) {
+ void *d;
+
+ assert(b && b->ref >= 1);
+
+ if (b->ref == 1) {
+ memblock_unref(b);
+ return;
+ }
+
+ d = malloc(b->length);
+ assert(d);
+ memcpy(d, b->data, b->length);
+ b->data = d;
+ b->type = MEMBLOCK_DYNAMIC;
+}
+
diff --git a/src/memblock.h b/src/memblock.h
new file mode 100644
index 00000000..48e87286
--- /dev/null
+++ b/src/memblock.h
@@ -0,0 +1,31 @@
+#ifndef foomemblockhfoo
+#define foomemblockhfoo
+
+#include <sys/types.h>
+
+enum memblock_type { MEMBLOCK_FIXED, MEMBLOCK_APPENDED, MEMBLOCK_DYNAMIC };
+
+struct memblock {
+ enum memblock_type type;
+ unsigned ref;
+ size_t length;
+ void *data;
+};
+
+struct memchunk {
+ struct memblock *memblock;
+ size_t index, length;
+};
+
+struct memblock *memblock_new(size_t length);
+struct memblock *memblock_new_fixed(void *data, size_t length);
+struct memblock *memblock_new_dynamic(void *data, size_t length);
+
+void memblock_unref(struct memblock*b);
+struct memblock* memblock_ref(struct memblock*b);
+
+void memblock_unref_fixed(struct memblock*b);
+
+#define memblock_assert_exclusive(b) assert((b)->ref == 1)
+
+#endif
diff --git a/src/memblockq.c b/src/memblockq.c
new file mode 100644
index 00000000..1424c556
--- /dev/null
+++ b/src/memblockq.c
@@ -0,0 +1,156 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include "memblockq.h"
+
+struct memblock_list {
+ struct memblock_list *next;
+ struct memchunk chunk;
+};
+
+struct memblockq {
+ struct memblock_list *blocks, *blocks_tail;
+ unsigned n_blocks;
+ size_t total_length;
+ size_t maxlength;
+ size_t base;
+};
+
+struct memblockq* memblockq_new(size_t maxlength, size_t base) {
+ struct memblockq* bq;
+ assert(maxlength && base);
+
+ bq = malloc(sizeof(struct memblockq));
+ assert(bq);
+ bq->blocks = bq->blocks_tail = 0;
+ bq->n_blocks = 0;
+ bq->total_length = 0;
+ bq->base = base;
+ bq->maxlength = ((maxlength+base-1)/base)*base;
+ assert(bq->maxlength >= base);
+ return bq;
+}
+
+void memblockq_free(struct memblockq* bq) {
+ struct memblock_list *l;
+ assert(bq);
+
+ while ((l = bq->blocks)) {
+ bq->blocks = l->next;
+ memblock_unref(l->chunk.memblock);
+ free(l);
+ }
+
+ free(bq);
+}
+
+void memblockq_push(struct memblockq* bq, struct memchunk *chunk, size_t delta) {
+ struct memblock_list *q;
+ assert(bq && chunk && chunk->memblock && chunk->index);
+
+ q = malloc(sizeof(struct memblock_list));
+ assert(q);
+
+ q->chunk = *chunk;
+ memblock_ref(q->chunk.memblock);
+ assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length);
+ q->next = NULL;
+
+ if (bq->blocks_tail)
+ bq->blocks_tail->next = q;
+ else
+ bq->blocks = q;
+
+ bq->blocks_tail = q;
+
+ bq->n_blocks++;
+ bq->total_length += chunk->length;
+
+ memblockq_shorten(bq, bq->maxlength);
+}
+
+int memblockq_peek(struct memblockq* bq, struct memchunk *chunk) {
+ assert(bq && chunk);
+
+ if (!bq->blocks)
+ return -1;
+
+ *chunk = bq->blocks->chunk;
+ memblock_ref(chunk->memblock);
+ return 0;
+}
+
+int memblockq_pop(struct memblockq* bq, struct memchunk *chunk) {
+ struct memblock_list *q;
+
+ assert(bq && chunk);
+
+ if (!bq->blocks)
+ return -1;
+
+ q = bq->blocks;
+ bq->blocks = bq->blocks->next;
+
+ *chunk = q->chunk;
+
+ bq->n_blocks--;
+ bq->total_length -= chunk->length;
+
+ free(q);
+ return 0;
+}
+
+void memblockq_drop(struct memblockq *bq, size_t length) {
+ assert(bq);
+
+ while (length > 0) {
+ size_t l = length;
+ assert(bq->blocks && bq->total_length >= length);
+
+ if (l > bq->blocks->chunk.length)
+ l = bq->blocks->chunk.length;
+
+ bq->blocks->chunk.index += l;
+ bq->blocks->chunk.length -= l;
+ bq->total_length -= l;
+
+ if (bq->blocks->chunk.length == 0) {
+ struct memblock_list *q;
+
+ q = bq->blocks;
+ bq->blocks = bq->blocks->next;
+ memblock_unref(q->chunk.memblock);
+ free(q);
+
+ bq->n_blocks--;
+ }
+
+ length -= l;
+ }
+}
+
+void memblockq_shorten(struct memblockq *bq, size_t length) {
+ size_t l;
+ assert(bq);
+
+ if (bq->total_length <= length)
+ return;
+
+ l = bq->total_length - length;
+ l /= bq->base;
+ l *= bq->base;
+
+ memblockq_drop(bq, l);
+}
+
+
+void memblockq_empty(struct memblockq *bq) {
+ assert(bq);
+ memblockq_shorten(bq, 0);
+}
+
+int memblockq_is_empty(struct memblockq *bq) {
+ assert(bq);
+
+ return bq->total_length >= bq->base;
+}
diff --git a/src/memblockq.h b/src/memblockq.h
new file mode 100644
index 00000000..75c5e59e
--- /dev/null
+++ b/src/memblockq.h
@@ -0,0 +1,24 @@
+#ifndef foomemblockqhfoo
+#define foomemblockqhfoo
+
+#include <sys/types.h>
+
+#include "memblock.h"
+
+struct memblockq;
+
+struct memblockq* memblockq_new(size_t maxlength, size_t base);
+void memblockq_free(struct memblockq* bq);
+
+void memblockq_push(struct memblockq* bq, struct memchunk *chunk, size_t delta);
+
+int memblockq_pop(struct memblockq* bq, struct memchunk *chunk);
+int memblockq_peek(struct memblockq* bq, struct memchunk *chunk);
+void memblockq_drop(struct memblockq *bq, size_t length);
+
+void memblockq_shorten(struct memblockq *bq, size_t length);
+void memblockq_empty(struct memblockq *bq);
+
+int memblockq_is_empty(struct memblockq *bq);
+
+#endif
diff --git a/src/module.c b/src/module.c
new file mode 100644
index 00000000..bcd0b6c0
--- /dev/null
+++ b/src/module.c
@@ -0,0 +1,98 @@
+#include <stdlib.h>
+#include <assert.h>
+
+#include "module.h"
+
+struct module* module_load(struct core *c, const char *name, const char *argument) {
+ struct module *m = NULL;
+
+ assert(c && name);
+
+ m = malloc(sizeof(struct module));
+ assert(m);
+
+ if (!(m->dl = lt_dlopenext(name)))
+ goto fail;
+
+ if (!(m->init = lt_dlsym(m->dl, "module_init")))
+ goto fail;
+
+ if (!(m->done = lt_dlsym(m->dl, "module_done")))
+ goto fail;
+
+ m->name = strdup(name);
+ m->argument = argument ? strdup(argument) : NULL;
+ m->userdata = NULL;
+
+ assert(m->init);
+ if (m->init(c, m) < 0)
+ goto fail;
+
+ if (!c->modules)
+ c->modules = idxset_new(NULL, NULL);
+
+ assert(c->modules);
+ r = idxset_put(c->modules, m, &m->index);
+ assert(r >= 0 && m->index != IDXSET_INVALID);
+ return m;
+
+fail:
+ if (m) {
+ if (m->dl)
+ lt_dlclose(m->dl);
+
+ free(m);
+ }
+
+ return NULL;
+}
+
+static void module_free(struct module *m) {
+ assert(m && m->done);
+ m->done(c, m);
+
+ lt_dlcose(m->dl);
+ free(m->name);
+ free(m->argument);
+ free(m);
+}
+
+void module_unload(struct core *c, struct module *m) {
+ struct module *m;
+ assert(c && index != IDXSET_INVALID);
+
+ assert(c->modules);
+ if (!(m = idxset_remove_by_data(c->modules, m, NULL)))
+ return;
+
+ module_free(m);
+}
+
+void module_unload_by_index(struct core *c, guint32_t index) {
+ struct module *m;
+ assert(c && index != IDXSET_INVALID);
+
+ assert(c->modules);
+ if (!(m = idxset_remove_by_index(c->modules, index)))
+ return;
+
+ module_free(m);
+}
+
+
+void free_callback(void *p, void *userdata) {
+ struct module *m = p;
+ assert(m);
+ module_free(m);
+}
+
+void module_unload_all(struct core *c) {
+ assert(c);
+
+ if (!c->modules)
+ return;
+
+ idxset_free(c->modules, free_callback, NULL);
+ c->modules = NULL;
+}
+
diff --git a/src/module.h b/src/module.h
new file mode 100644
index 00000000..d0dfa045
--- /dev/null
+++ b/src/module.h
@@ -0,0 +1,27 @@
+#ifndef foomodulehfoo
+#define foomodulehfoo
+
+#include <inttypes.h>
+#include <ltdl.h>
+
+#include "core.h"
+
+struct module {
+ char *name, *argument;
+ uint32_t index;
+
+ lt_dlhandle *dl;
+
+ int (*init)(struct core *c, struct module*m);
+ void (*done)(struct core *c, struct module*m);
+
+ void *userdata;
+};
+
+struct module* module_load(struct core *c, const char *name, const char*argument);
+void module_unload(struct core *c, struct module *m);
+void module_unload_by_index(struct core *c, uint32_t index);
+
+void module_unload_all(struct core *c);
+
+#endif
diff --git a/src/oss.c b/src/oss.c
new file mode 100644
index 00000000..42e60360
--- /dev/null
+++ b/src/oss.c
@@ -0,0 +1,30 @@
+#include "module.h"
+
+struct userdata {
+ struct sink *sink;
+ struct source *source;
+ int fd;
+};
+
+int module_init(struct core *c, struct module*m) {
+ struct userdata *u;
+ assert(c && m);
+
+ u = malloc(sizeof(struct userdata));
+ assert(u);
+ memset(u, 0, sizeof(struct userdata));
+ m->userdata = u;
+
+ return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+ struct userdata *u;
+ assert(c && m);
+
+ u = m->userdata;
+
+ sink_free(u->sink);
+ source_free(u->source);
+ free(u);
+}
diff --git a/src/outputstream.c b/src/outputstream.c
new file mode 100644
index 00000000..ffec77da
--- /dev/null
+++ b/src/outputstream.c
@@ -0,0 +1,41 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "outputstream.h"
+
+struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name) {
+ struct output_stream *o;
+ int r;
+ assert(s && spec);
+
+ o = malloc(sizeof(struct output_stream));
+ assert(o);
+ o->name = name ? strdup(name) : NULL;
+ o->source = s;
+ o->spec = *spec;
+
+ o->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
+ assert(o->memblockq);
+
+ assert(s->core);
+ r = idxset_put(s->core->output_streams, o, &o->index);
+ assert(r == 0 && o->index != IDXSET_INVALID);
+ r = idxset_put(s->output_streams, o, NULL);
+ assert(r == 0);
+
+ return o;
+}
+
+void output_stream_free(struct output_stream* o) {
+ assert(o);
+
+ memblockq_free(o->memblockq);
+
+ assert(o->source && o->source->core);
+ idxset_remove_by_data(o->source->core->output_streams, o, NULL);
+ idxset_remove_by_data(o->source->output_streams, o, NULL);
+
+ free(o->name);
+ free(o);
+}
diff --git a/src/outputstream.h b/src/outputstream.h
new file mode 100644
index 00000000..41054341
--- /dev/null
+++ b/src/outputstream.h
@@ -0,0 +1,22 @@
+#ifndef foooutputstreamhfoo
+#define foooutputstreamhfoo
+
+#include <inttypes.h>
+#include "source.h"
+#include "sample.h"
+#include "memblockq.h"
+
+struct output_stream {
+ char *name;
+ uint32_t index;
+
+ struct source *source;
+ struct sample_spec spec;
+
+ struct memblockq *memblockq;
+};
+
+struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name);
+void output_stream_free(struct output_stream* o);
+
+#endif
diff --git a/src/packet.c b/src/packet.c
new file mode 100644
index 00000000..086e4b2a
--- /dev/null
+++ b/src/packet.c
@@ -0,0 +1,29 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include "packet.h"
+
+struct packet* packet_new(uint32_t length) {
+ struct packet *p;
+ assert(length);
+ p = malloc(sizeof(struct packet)+length);
+ assert(p);
+
+ p->ref = 1;
+ p->length = length;
+ return p;
+}
+
+struct packet* packet_ref(struct packet *p) {
+ assert(p && p->ref >= 1);
+ p->ref++;
+ return p;
+}
+
+void packet_unref(struct packet *p) {
+ assert(p && p->ref >= 1);
+ p->ref--;
+
+ if (p->ref == 0)
+ free(p);
+}
diff --git a/src/packet.h b/src/packet.h
new file mode 100644
index 00000000..781c0e66
--- /dev/null
+++ b/src/packet.h
@@ -0,0 +1,18 @@
+#ifndef foopackethfoo
+#define foopackethfoo
+
+#include <sys/types.h>
+#include <stdint.h>
+
+struct packet {
+ unsigned ref;
+ size_t length;
+ uint8_t data[];
+};
+
+struct packet* packet_new(uint32_t length);
+
+struct packet* packet_ref(struct packet *p);
+void packet_unref(struct packet *p);
+
+#endif
diff --git a/src/protocol-native-tcp.c b/src/protocol-native-tcp.c
new file mode 100644
index 00000000..b33f3e15
--- /dev/null
+++ b/src/protocol-native-tcp.c
@@ -0,0 +1,19 @@
+#include "module.h"
+
+int module_init(struct core *c, struct module*m) {
+ struct socket_server *s;
+ assert(c && m);
+
+ if (!(s = socket_server_new_ipv4(c->mainloop, INADDR_LOOPBACK, 4711)))
+ return -1;
+
+ m->userdata = protocol_native_new(s);
+ assert(m->userdata);
+ return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+ assert(c && m);
+
+ protocol_native_free(m->userdata);
+}
diff --git a/src/protocol-native-unix.c b/src/protocol-native-unix.c
new file mode 100644
index 00000000..a18965cd
--- /dev/null
+++ b/src/protocol-native-unix.c
@@ -0,0 +1,27 @@
+#include "module.h"
+
+int module_init(struct core *c, struct module*m) {
+ struct fn[PATH_MAX];
+ struct socket_server *s;
+ char *t;
+ assert(c && m);
+
+ if (!(t = getenv("TMP")))
+ if (!(t = getenv("TEMP")))
+ t = "/tmp";
+
+ snprintf(fn, sizeof(fn), "%s/foosock", t);
+
+ if (!(s = socket_server_new_unix(c->mainloop, fn)))
+ return -1;
+
+ m->userdata = protocol_native_new(s);
+ assert(m->userdata);
+ return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+ assert(c && m);
+
+ protocol_native_free(m->userdata);
+}
diff --git a/src/protocol-native.c b/src/protocol-native.c
new file mode 100644
index 00000000..bdb69355
--- /dev/null
+++ b/src/protocol-native.c
@@ -0,0 +1,49 @@
+#include "protocol-native.h"
+
+struct protocol_native {
+ struct socket_server*server;
+ struct idxset *connection;
+};
+
+struct stream_info {
+ guint32_t tag;
+
+ union {
+ struct output_stream *output_stream;
+ struct input_stream *input_stream;
+ }
+};
+
+struct connection {
+ struct client *client;
+ struct serializer *serializer;
+
+
+};
+
+static void on_connection(struct socket_server *server, struct iochannel *io, void *userdata) {
+ struct protocol_native *p = userdata;
+ assert(server && io && p && p->server == server);
+
+
+}
+
+struct protocol_native* protocol_native(struct socket_server *server) {
+ struct protocol_native *p;
+ assert(server);
+
+ p = malloc(sizeof(struct protocol_native));
+ assert(p);
+
+ p->server = server;
+ socket_server_set_callback(p->server, callback, p);
+
+ return p;
+}
+
+void protocol_native_free(struct protocol_native *p) {
+ assert(p);
+
+ socket_server_free(p->server);
+ free(p);
+}
diff --git a/src/protocol-native.h b/src/protocol-native.h
new file mode 100644
index 00000000..bdad03b4
--- /dev/null
+++ b/src/protocol-native.h
@@ -0,0 +1,9 @@
+#ifndef fooprotocolnativehfoo
+#define fooprotocolnativehfoo
+
+struct protocol_native;
+
+struct protocol_native* protocol_native(struct socket_server *server);
+void protocol_native_free(struct protocol_native *n);
+
+#endif
diff --git a/src/protocol-simple-tcp.c b/src/protocol-simple-tcp.c
new file mode 100644
index 00000000..e71d7142
--- /dev/null
+++ b/src/protocol-simple-tcp.c
@@ -0,0 +1,24 @@
+#include <assert.h>
+#include <arpa/inet.h>
+
+#include "module.h"
+#include "socket-server.h"
+#include "protocol-simple.h"
+
+int module_init(struct core *c, struct module*m) {
+ struct socket_server *s;
+ assert(c && m);
+
+ if (!(s = socket_server_new_ipv4(c->mainloop, INADDR_LOOPBACK, 4712)))
+ return -1;
+
+ m->userdata = protocol_simple_new(c, s, PROTOCOL_SIMPLE_PLAYBACK);
+ assert(m->userdata);
+ return 0;
+}
+
+void module_done(struct core *c, struct module*m) {
+ assert(c && m);
+
+ protocol_simple_free(m->userdata);
+}
diff --git a/src/protocol-simple.c b/src/protocol-simple.c
new file mode 100644
index 00000000..3335bc14
--- /dev/null
+++ b/src/protocol-simple.c
@@ -0,0 +1,173 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <limits.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+
+#include "inputstream.h"
+#include "outputstream.h"
+#include "protocol-simple.h"
+#include "client.h"
+
+struct connection {
+ struct protocol_simple *protocol;
+ struct iochannel *io;
+ struct input_stream *istream;
+ struct output_stream *ostream;
+ struct client *client;
+};
+
+struct protocol_simple {
+ struct core *core;
+ struct socket_server*server;
+ struct idxset *connections;
+ enum protocol_simple_mode mode;
+};
+
+#define BUFSIZE PIPE_BUF
+
+static void free_connection(void *data, void *userdata) {
+ struct connection *c = data;
+ assert(data);
+
+ if (c->istream)
+ input_stream_free(c->istream);
+ if (c->ostream)
+ output_stream_free(c->ostream);
+
+ client_free(c->client);
+
+ iochannel_free(c->io);
+ free(c);
+}
+
+static void io_callback(struct iochannel*io, void *userdata) {
+ struct connection *c = userdata;
+ assert(io && c);
+
+ if (c->istream && iochannel_is_readable(io)) {
+ struct memchunk chunk;
+ ssize_t r;
+
+ chunk.memblock = memblock_new(BUFSIZE);
+ assert(chunk.memblock);
+
+ if ((r = iochannel_read(io, chunk.memblock->data, BUFSIZE)) <= 0) {
+ fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno));
+ memblock_unref(chunk.memblock);
+ goto fail;
+ }
+
+ chunk.memblock->length = r;
+ chunk.length = r;
+ chunk.index = 0;
+
+ memblockq_push(c->istream->memblockq, &chunk, 0);
+ input_stream_notify(c->istream);
+ memblock_unref(chunk.memblock);
+ }
+
+ if (c->ostream && iochannel_is_writable(io)) {
+ struct memchunk chunk;
+ ssize_t r;
+
+ memblockq_peek(c->ostream->memblockq, &chunk);
+ assert(chunk.memblock && chunk.length);
+
+ if ((r = iochannel_write(io, chunk.memblock->data+chunk.index, chunk.length)) < 0) {
+ fprintf(stderr, "write(): %s\n", strerror(errno));
+ memblock_unref(chunk.memblock);
+ goto fail;
+ }
+
+ memblockq_drop(c->ostream->memblockq, r);
+ memblock_unref(chunk.memblock);
+ }
+
+ return;
+
+fail:
+ idxset_remove_by_data(c->protocol->connections, c, NULL);
+ free_connection(c, NULL);
+}
+
+static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {
+ struct protocol_simple *p = userdata;
+ struct connection *c = NULL;
+ assert(s && io && p);
+
+ c = malloc(sizeof(struct connection));
+ assert(c);
+ c->io = io;
+ c->istream = NULL;
+ c->ostream = NULL;
+ c->protocol = p;
+
+ if (p->mode & PROTOCOL_SIMPLE_RECORD) {
+ struct source *source;
+
+ if (!(source = core_get_default_source(p->core))) {
+ fprintf(stderr, "Failed to get default source.\n");
+ goto fail;
+ }
+
+ c->ostream = output_stream_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name);
+ assert(c->ostream);
+ }
+
+ if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) {
+ struct sink *sink;
+
+ if (!(sink = core_get_default_sink(p->core))) {
+ fprintf(stderr, "Failed to get default sink.\n");
+ goto fail;
+ }
+
+ c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name);
+ assert(c->istream);
+ }
+
+ c->client = client_new(p->core, "SIMPLE", "Client");
+ assert(c->client);
+
+ iochannel_set_callback(c->io, io_callback, c);
+ idxset_put(p->connections, c, NULL);
+ return;
+
+fail:
+ if (c) {
+ if (c->istream)
+ input_stream_free(c->istream);
+ if (c->ostream)
+ output_stream_free(c->ostream);
+
+ iochannel_free(c->io);
+ free(c);
+ }
+}
+
+struct protocol_simple* protocol_simple_new(struct core *core, struct socket_server *server, enum protocol_simple_mode mode) {
+ struct protocol_simple* p;
+ assert(core && server && mode <= PROTOCOL_SIMPLE_DUPLEX && mode > 0);
+
+ p = malloc(sizeof(struct protocol_simple));
+ assert(p);
+ p->core = core;
+ p->server = server;
+ p->connections = idxset_new(NULL, NULL);
+ p->mode = mode;
+
+ socket_server_set_callback(p->server, on_connection, p);
+
+ return p;
+}
+
+
+void protocol_simple_free(struct protocol_simple *p) {
+ assert(p);
+
+ idxset_free(p->connections, free_connection, NULL);
+ socket_server_free(p->server);
+ free(p);
+}
diff --git a/src/protocol-simple.h b/src/protocol-simple.h
new file mode 100644
index 00000000..f6210436
--- /dev/null
+++ b/src/protocol-simple.h
@@ -0,0 +1,17 @@
+#ifndef fooprotocolsimplehfoo
+#define fooprotocolsimplehfoo
+
+#include "socket-server.h"
+
+struct protocol_simple;
+
+enum protocol_simple_mode {
+ PROTOCOL_SIMPLE_RECORD = 1,
+ PROTOCOL_SIMPLE_PLAYBACK = 2,
+ PROTOCOL_SIMPLE_DUPLEX = 3
+};
+
+struct protocol_simple* protocol_simple_new(struct core *core, struct socket_server *server, enum protocol_simple_mode mode);
+void protocol_simple_free(struct protocol_simple *n);
+
+#endif
diff --git a/src/pstream.c b/src/pstream.c
new file mode 100644
index 00000000..083ebc22
--- /dev/null
+++ b/src/pstream.c
@@ -0,0 +1,359 @@
+#include <stdlib.h>
+#include <assert.h>
+
+#include "pstream.h"
+#include "queue.h"
+
+enum pstream_descriptor_index {
+ PSTREAM_DESCRIPTOR_LENGTH,
+ PSTREAM_DESCRIPTOR_CHANNEL,
+ PSTREAM_DESCRIPTOR_DELTA,
+ PSTREAM_DESCRIPTOR_MAX
+};
+
+typedef uint32_t pstream_descriptor[PSTREAM_DESCRIPTOR_MAX];
+
+#define PSTREAM_DESCRIPTOR_SIZE (PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t))
+#define FRAME_SIZE_MAX (1024*64)
+
+struct item_info {
+ enum { PSTREAM_ITEM_PACKET, PSTREAM_ITEM_MEMBLOCK } type;
+
+ /* memblock info */
+ struct memchunk chunk;
+ uint32_t channel;
+ int32_t delta;
+
+ /* packet info */
+ struct packet *packet;
+};
+
+struct pstream {
+ struct mainloop *mainloop;
+ struct mainloop_source *mainloop_source;
+ struct iochannel *io;
+ struct queue *send_queue;
+
+ int dead;
+
+ struct {
+ struct item_info* current;
+ pstream_descriptor descriptor;
+ void *data;
+ size_t index;
+ } write;
+
+ void (*send_callback) (struct pstream *p, void *userdata);
+ void *send_callback_userdata;
+
+ struct {
+ struct memblock *memblock;
+ struct packet *packet;
+ pstream_descriptor descriptor;
+ void *data;
+ size_t index;
+ } read;
+
+ void (*recieve_packet_callback) (struct pstream *p, struct packet *packet, void *userdata);
+ void *recieve_packet_callback_userdata;
+
+ void (*recieve_memblock_callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata);
+ void *recieve_memblock_callback_userdata;
+};
+
+static void do_write(struct pstream *p);
+static void do_read(struct pstream *p);
+
+static void io_callback(struct iochannel*io, void *userdata) {
+ struct pstream *p = userdata;
+ assert(p && p->io == io);
+ do_write(p);
+ do_read(p);
+}
+
+static void prepare_callback(struct mainloop_source *s, void*userdata) {
+ struct pstream *p = userdata;
+ assert(p && p->mainloop_source == s);
+ do_write(p);
+ do_read(p);
+}
+
+struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) {
+ struct pstream *p;
+ assert(io);
+
+ p = malloc(sizeof(struct pstream));
+ assert(p);
+
+ p->io = io;
+ iochannel_set_callback(io, io_callback, p);
+
+ p->dead = 0;
+
+ p->mainloop = m;
+ p->mainloop_source = mainloop_source_new_prepare(m, prepare_callback, p);
+ mainloop_source_enable(p->mainloop_source, 0);
+
+ p->send_queue = queue_new();
+ assert(p->send_queue);
+
+ p->write.current = NULL;
+ p->write.index = 0;
+
+ p->read.memblock = NULL;
+ p->read.packet = NULL;
+ p->read.index = 0;
+
+ p->send_callback = NULL;
+ p->send_callback_userdata = NULL;
+
+ p->recieve_packet_callback = NULL;
+ p->recieve_packet_callback_userdata = NULL;
+
+ p->recieve_memblock_callback = NULL;
+ p->recieve_memblock_callback_userdata = NULL;
+
+ return p;
+}
+
+static void item_free(void *item, void *p) {
+ struct item_info *i = item;
+ assert(i);
+
+ if (i->type == PSTREAM_ITEM_PACKET) {
+ assert(i->chunk.memblock);
+ memblock_unref(i->chunk.memblock);
+ } else {
+ assert(i->type == PSTREAM_ITEM_MEMBLOCK);
+ assert(i->packet);
+ packet_unref(i->packet);
+ }
+
+ free(i);
+}
+
+void pstream_free(struct pstream *p) {
+ assert(p);
+
+ iochannel_free(p->io);
+ queue_free(p->send_queue, item_free, NULL);
+
+ if (p->write.current)
+ item_free(p->write.current, NULL);
+
+ if (p->read.memblock)
+ memblock_unref(p->read.memblock);
+
+ if (p->read.packet)
+ packet_unref(p->read.packet);
+
+ mainloop_source_free(p->mainloop_source);
+ free(p);
+}
+
+void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata) {
+ assert(p && callback);
+
+ p->send_callback = callback;
+ p->send_callback_userdata = userdata;
+}
+
+void pstream_send_packet(struct pstream*p, struct packet *packet) {
+ struct item_info *i;
+ assert(p && packet);
+
+ i = malloc(sizeof(struct item_info));
+ assert(i);
+ i->type = PSTREAM_ITEM_PACKET;
+ i->packet = packet;
+
+ queue_push(p->send_queue, i);
+ mainloop_source_enable(p->mainloop_source, 1);
+}
+
+void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) {
+ struct item_info *i;
+ assert(p && channel && chunk);
+
+ i = malloc(sizeof(struct item_info));
+ assert(i);
+ i->type = PSTREAM_ITEM_MEMBLOCK;
+ i->chunk = *chunk;
+ i->channel = channel;
+ i->delta = delta;
+
+ queue_push(p->send_queue, i);
+ mainloop_source_enable(p->mainloop_source, 1);
+}
+
+void pstream_set_recieve_packet_callback(struct pstream *p, void (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) {
+ assert(p && callback);
+
+ p->recieve_packet_callback = callback;
+ p->recieve_packet_callback_userdata = userdata;
+}
+
+void pstream_set_recieve_memblock_callback(struct pstream *p, void (*callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata), void *userdata) {
+ assert(p && callback);
+
+ p->recieve_memblock_callback = callback;
+ p->recieve_memblock_callback_userdata = userdata;
+}
+
+static void prepare_next_write_item(struct pstream *p) {
+ assert(p);
+
+ if (!(p->write.current = queue_pop(p->send_queue)))
+ return;
+
+ p->write.index = 0;
+
+ if (p->write.current->type == PSTREAM_ITEM_PACKET) {
+ assert(p->write.current->packet);
+ p->write.data = p->write.current->packet->data;
+ p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = p->write.current->packet->length;
+ p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0;
+ p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0;
+ } else {
+ assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock);
+ p->write.data = p->write.current->chunk.memblock->data + p->write.current->chunk.index;
+ p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = p->write.current->chunk.length;
+ p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = p->write.current->channel;
+ p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = p->write.current->delta;
+ }
+}
+
+static void do_write(struct pstream *p) {
+ void *d;
+ size_t l;
+ ssize_t r;
+ assert(p);
+
+ mainloop_source_enable(p->mainloop_source, 0);
+
+ if (p->dead || !iochannel_is_writable(p->io))
+ return;
+
+ if (!p->write.current)
+ prepare_next_write_item(p);
+
+ if (!p->write.current)
+ return;
+
+ assert(p->write.data);
+
+ if (p->write.index < PSTREAM_DESCRIPTOR_SIZE) {
+ d = (void*) p->write.descriptor + p->write.index;
+ l = PSTREAM_DESCRIPTOR_SIZE - p->write.index;
+ } else {
+ d = (void*) p->write.data + p->write.index - PSTREAM_DESCRIPTOR_SIZE;
+ l = p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] - p->write.index - PSTREAM_DESCRIPTOR_SIZE;
+ }
+
+ if ((r = iochannel_write(p->io, d, l)) < 0) {
+ p->dead = 1;
+ return;
+ }
+
+ p->write.index += r;
+
+ if (p->write.index >= PSTREAM_DESCRIPTOR_SIZE+p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) {
+ assert(p->write.current);
+ item_free(p->write.current, (void *) 1);
+ p->write.current = NULL;
+
+ if (p->send_callback && queue_is_empty(p->send_queue))
+ p->send_callback(p, p->send_callback_userdata);
+ }
+}
+
+static void do_read(struct pstream *p) {
+ void *d;
+ size_t l;
+ ssize_t r;
+ assert(p);
+
+ mainloop_source_enable(p->mainloop_source, 0);
+
+ if (p->dead || !iochannel_is_readable(p->io))
+ return;
+
+ if (p->read.index < PSTREAM_DESCRIPTOR_SIZE) {
+ d = (void*) p->read.descriptor + p->read.index;
+ l = PSTREAM_DESCRIPTOR_SIZE - p->read.index;
+ } else {
+ assert(p->read.data);
+ d = (void*) p->read.data + p->read.index - PSTREAM_DESCRIPTOR_SIZE;
+ l = p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] - p->read.index - PSTREAM_DESCRIPTOR_SIZE;
+ }
+
+ if ((r = iochannel_read(p->io, d, l)) <= 0) {
+ p->dead = 1;
+ return;
+ }
+
+ p->read.index += r;
+
+ if (p->read.index == PSTREAM_DESCRIPTOR_SIZE) {
+ /* Reading of frame descriptor complete */
+
+ /* Frame size too large */
+ if (p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] > FRAME_SIZE_MAX) {
+ p->dead = 1;
+ return;
+ }
+
+ assert(!p->read.packet && !p->read.memblock);
+
+ if (p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] == 0) {
+ /* Frame is a packet frame */
+ p->read.packet = packet_new(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]);
+ assert(p->read.packet);
+ p->read.data = p->read.packet->data;
+ } else {
+ /* Frame is a memblock frame */
+ p->read.memblock = memblock_new(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]);
+ assert(p->read.memblock);
+ p->read.data = p->read.memblock->data;
+ }
+
+ } else if (p->read.index > PSTREAM_DESCRIPTOR_SIZE) {
+ /* Frame payload available */
+
+ if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblockd data? Than pass it to the user */
+ size_t l;
+
+ l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r;
+
+ if (l > 0) {
+ struct memchunk chunk;
+
+ chunk.memblock = p->read.memblock;
+ chunk.index = p->read.index - PSTREAM_DESCRIPTOR_SIZE - l;
+ chunk.length = l;
+
+ p->recieve_memblock_callback(p, p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL], (int32_t) p->read.descriptor[PSTREAM_DESCRIPTOR_DELTA], &chunk, p->recieve_memblock_callback_userdata);
+ }
+ }
+
+ /* Frame complete */
+ if (p->read.index >= p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] + PSTREAM_DESCRIPTOR_SIZE) {
+ if (p->read.memblock) {
+ assert(!p->read.packet);
+
+ memblock_unref(p->read.memblock);
+ p->read.memblock = NULL;
+ } else {
+ assert(p->read.packet);
+
+ if (p->recieve_packet_callback)
+ p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata);
+
+ packet_unref(p->read.packet);
+ p->read.packet = NULL;
+ }
+
+ p->read.index = 0;
+ }
+ }
+}
diff --git a/src/pstream.h b/src/pstream.h
new file mode 100644
index 00000000..c0b57496
--- /dev/null
+++ b/src/pstream.h
@@ -0,0 +1,22 @@
+#ifndef foopstreamhfoo
+#define foopstreamhfoo
+
+#include <inttypes.h>
+
+#include "packet.h"
+#include "memblock.h"
+#include "iochannel.h"
+
+struct pstream;
+
+struct pstream* pstream_new(struct mainloop *m, struct iochannel *io);
+void pstream_free(struct pstream*p);
+
+void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata);
+void pstream_send_packet(struct pstream*p, struct packet *packet);
+void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk);
+
+void pstream_set_recieve_packet_callback(struct pstream *p, void (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata);
+void pstream_set_recieve_memblock_callback(struct pstream *p, void (*callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata), void *userdata);
+
+#endif
diff --git a/src/queue.c b/src/queue.c
new file mode 100644
index 00000000..90823ae6
--- /dev/null
+++ b/src/queue.c
@@ -0,0 +1,77 @@
+#include <assert.h>
+#include <stdlib.h>
+
+#include "queue.h"
+
+struct queue_entry {
+ struct queue_entry *next;
+ void *data;
+};
+
+struct queue {
+ struct queue_entry *front, *back;
+ unsigned length;
+};
+
+struct queue* queue_new(void) {
+ struct queue *q = malloc(sizeof(struct queue));
+ assert(q);
+ q->front = q->back = NULL;
+ q->length = 0;
+ return q;
+}
+
+void queue_free(struct queue* q, void (*destroy)(void *p, void *userdata), void *userdata) {
+ struct queue_entry *e;
+ assert(q);
+
+ e = q->front;
+ while (e) {
+ struct queue_entry *n = e->next;
+
+ if (destroy)
+ destroy(e->data, userdata);
+
+ free(e);
+ e = n;
+ }
+
+ free(q);
+}
+
+void queue_push(struct queue *q, void *p) {
+ struct queue_entry *e;
+
+ e = malloc(sizeof(struct queue_entry));
+
+ e->data = p;
+ e->next = NULL;
+
+ if (q->back)
+ q->back->next = e;
+ else {
+ assert(!q->front);
+ q->front = e;
+ }
+
+ q->back = e;
+ q->length++;
+}
+
+void* queue_pop(struct queue *q) {
+ void *p;
+ struct queue_entry *e;
+ assert(q);
+
+ if (!(e = q->front))
+ return NULL;
+
+ q->front = e->next;
+ if (q->back == e)
+ q->back = NULL;
+
+ p = e->data;
+ free(e);
+
+ return p;
+}
diff --git a/src/queue.h b/src/queue.h
new file mode 100644
index 00000000..6b371a81
--- /dev/null
+++ b/src/queue.h
@@ -0,0 +1,13 @@
+#ifndef fooqueuehfoo
+#define fooqueuehfoo
+
+struct queue;
+
+struct queue* queue_new(void);
+void queue_free(struct queue* q, void (*destroy)(void *p, void *userdata), void *userdata);
+void queue_push(struct queue *q, void *p);
+void* queue_pop(struct queue *q);
+
+int queue_is_empty(struct queue *q);
+
+#endif
diff --git a/src/sample.c b/src/sample.c
new file mode 100644
index 00000000..74a54937
--- /dev/null
+++ b/src/sample.c
@@ -0,0 +1,80 @@
+#include <string.h>
+#include <assert.h>
+
+#include "sample.h"
+
+struct sample_spec default_sample_spec = {
+ .format = SAMPLE_S16NE,
+ .rate = 44100,
+ .channels = 2
+};
+
+struct memblock *silence(struct memblock* b, struct sample_spec *spec) {
+ char c;
+ assert(b && spec);
+ memblock_assert_exclusive(b);
+
+ switch (spec->format) {
+ case SAMPLE_U8:
+ c = 127;
+ break;
+ case SAMPLE_S16LE:
+ case SAMPLE_S16BE:
+ case SAMPLE_FLOAT32:
+ c = 0;
+ break;
+ case SAMPLE_ALAW:
+ case SAMPLE_ULAW:
+ c = 80;
+ break;
+ }
+
+ memset(b->data, c, b->length);
+ return b;
+}
+
+void add_clip(struct memchunk *target, struct memchunk *chunk, struct sample_spec *spec) {
+ int16_t *p, *d;
+ size_t i;
+ assert(target && target->memblock && chunk && chunk->memblock && spec);
+ assert(spec->format == SAMPLE_S16NE);
+ assert((target->length & 1) == 0);
+
+ d = target->memblock->data + target->index;
+ p = chunk->memblock->data + chunk->index;
+
+ for (i = 0; i < target->length && i < chunk->length; i++) {
+ int32_t r = (int32_t) *d + (int32_t) *p;
+ if (r < -0x8000) r = 0x8000;
+ if (r > 0x7FFF) r = 0x7FFF;
+ *d = (int16_t) r;
+ }
+}
+
+size_t sample_size(struct sample_spec *spec) {
+ assert(spec);
+ size_t b;
+
+ switch (spec->format) {
+ case SAMPLE_U8:
+ case SAMPLE_ULAW:
+ case SAMPLE_ALAW:
+ b = 1;
+ break;
+ case SAMPLE_S16LE:
+ case SAMPLE_S16BE:
+ b = 2;
+ break;
+ case SAMPLE_FLOAT32:
+ b = 4;
+ break;
+ }
+
+ return b * spec->channels;
+}
+
+size_t bytes_per_second(struct sample_spec *spec) {
+ assert(spec);
+ return spec->rate*sample_size(spec);
+}
+
diff --git a/src/sample.h b/src/sample.h
new file mode 100644
index 00000000..ecbe33f2
--- /dev/null
+++ b/src/sample.h
@@ -0,0 +1,35 @@
+#ifndef foosamplehfoo
+#define foosamplehfoo
+
+#include <inttypes.h>
+
+#include "memblock.h"
+
+enum sample_format {
+ SAMPLE_U8,
+ SAMPLE_ALAW,
+ SAMPLE_ULAW,
+ SAMPLE_S16LE,
+ SAMPLE_S16BE,
+ SAMPLE_FLOAT32
+};
+
+#define SAMPLE_S16NE SAMPLE_S16LE
+
+struct sample_spec {
+ enum sample_format format;
+ uint32_t rate;
+ uint32_t channels;
+};
+
+#define DEFAULT_SAMPLE_SPEC default_sample_spec
+
+extern struct sample_spec default_sample_spec;
+
+struct memblock *silence(struct memblock* b, struct sample_spec *spec);
+void add_clip(struct memchunk *target, struct memchunk *chunk, struct sample_spec *spec);
+
+size_t bytes_per_second(struct sample_spec *spec);
+size_t sample_size(struct sample_spec *spec);
+
+#endif
diff --git a/src/sink-pipe.c b/src/sink-pipe.c
new file mode 100644
index 00000000..4a8348f8
--- /dev/null
+++ b/src/sink-pipe.c
@@ -0,0 +1,155 @@
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <stdio.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <limits.h>
+
+#include "iochannel.h"
+#include "sink.h"
+#include "module.h"
+
+struct userdata {
+ struct sink *sink;
+ struct iochannel *io;
+ struct core *core;
+ struct mainloop_source *mainloop_source;
+
+ struct memchunk memchunk;
+};
+
+static void do_write(struct userdata *u) {
+ ssize_t r;
+ assert(u);
+
+ mainloop_source_enable(u->mainloop_source, 0);
+
+ if (!iochannel_is_writable(u->io))
+ return;
+
+ if (!u->memchunk.length)
+ if (sink_render(u->sink, PIPE_BUF, &u->memchunk) < 0)
+ return;
+
+ assert(u->memchunk.memblock && u->memchunk.length);
+
+ if ((r = iochannel_write(u->io, u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) {
+ fprintf(stderr, "write() failed: %s\n", strerror(errno));
+ return;
+ }
+
+ u->memchunk.index += r;
+ u->memchunk.length -= r;
+
+ if (u->memchunk.length <= 0) {
+ memblock_unref(u->memchunk.memblock);
+ u->memchunk.memblock = NULL;
+ }
+}
+
+static void notify_callback(struct sink*s, void *userdata) {
+ struct userdata *u = userdata;
+ assert(u);
+
+ if (iochannel_is_writable(u->io))
+ mainloop_source_enable(u->mainloop_source, 1);
+}
+
+static void prepare_callback(struct mainloop_source *src, void *userdata) {
+ struct userdata *u = userdata;
+ assert(u);
+ do_write(u);
+}
+
+static void io_callback(struct iochannel *io, void*userdata) {
+ struct userdata *u = userdata;
+ assert(u);
+ do_write(u);
+}
+
+int module_init(struct core *c, struct module*m) {
+ struct userdata *u = NULL;
+ struct stat st;
+ struct sink *sink;
+ char *p;
+ int fd = -1;
+ const static struct sample_spec ss = {
+ .format = SAMPLE_S16NE,
+ .rate = 44100,
+ .channels = 2,
+ };
+ assert(c && m);
+
+ mkfifo((p = m->argument ? m->argument : "/tmp/musicfifo"), 0777);
+
+ if ((fd = open(p, O_RDWR) < 0)) {
+ fprintf(stderr, "open('%s'): %s\n", p, strerror(errno));
+ goto fail;
+ }
+
+ if (fstat(fd, &st) < 0) {
+ fprintf(stderr, "fstat('%s'): %s\n", p, strerror(errno));
+ goto fail;
+ }
+
+ if (!S_ISFIFO(st.st_mode)) {
+ fprintf(stderr, "'%s' is not a FIFO\n", p);
+ goto fail;
+ }
+
+ if (!(sink = sink_new(c, "fifo", &ss))) {
+ fprintf(stderr, "Failed to allocate new sink!\n");
+ goto fail;
+ }
+
+ u = malloc(sizeof(struct userdata));
+ assert(u);
+
+ u->core = c;
+ u->sink = sink;
+ sink_set_notify_callback(sink, notify_callback, u);
+
+ u->io = iochannel_new(c->mainloop, -1, fd);
+ assert(u->io);
+ iochannel_set_callback(u->io, io_callback, u);
+
+ u->memchunk.memblock = NULL;
+ u->memchunk.length = 0;
+
+ u->mainloop_source = mainloop_source_new_prepare(c->mainloop, prepare_callback, u);
+ assert(u->mainloop_source);
+ mainloop_source_enable(u->mainloop_source, 0);
+
+ m->userdata = u;
+
+
+ return 0;
+
+fail:
+ if (fd >= 0)
+ close(fd);
+
+ if (u)
+ free(u);
+
+ return -1;
+}
+
+void module_done(struct core *c, struct module*m) {
+ struct userdata *u;
+ assert(c && m);
+
+ u = m->userdata;
+ assert(u);
+
+ if (u->memchunk.memblock)
+ memblock_unref(u->memchunk.memblock);
+
+ sink_free(u->sink);
+ iochannel_free(u->io);
+ mainloop_source_free(u->mainloop_source);
+ free(u);
+}
diff --git a/src/sink.c b/src/sink.c
new file mode 100644
index 00000000..ac387c78
--- /dev/null
+++ b/src/sink.c
@@ -0,0 +1,217 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <stdio.h>
+
+#include "sink.h"
+#include "inputstream.h"
+
+struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) {
+ struct sink *s;
+ char *n = NULL;
+ int r;
+ assert(core && spec);
+
+ s = malloc(sizeof(struct sink));
+ assert(s);
+
+ s->name = name ? strdup(name) : NULL;
+ r = idxset_put(core->sinks, s, &s->index);
+ assert(s->index != IDXSET_INVALID && r >= 0);
+
+ s->core = core;
+ s->sample_spec = *spec;
+ s->input_streams = idxset_new(NULL, NULL);
+
+ if (name) {
+ n = malloc(strlen(name)+9);
+ sprintf(n, "%s_monitor", name);
+ }
+
+ s->monitor_source = source_new(core, n, spec);
+ s->volume = 0xFF;
+
+ s->notify_callback = NULL;
+ s->userdata = NULL;
+
+ return s;
+}
+
+void sink_free(struct sink *s) {
+ struct input_stream *i;
+ assert(s);
+
+ idxset_remove_by_data(s->core->sinks, s, NULL);
+ source_free(s->monitor_source);
+
+ while ((i = idxset_rrobin(s->input_streams, NULL)))
+ input_stream_free(i);
+
+ free(s->name);
+ free(s);
+}
+
+struct pass1_info {
+ size_t maxlength;
+ unsigned count;
+ struct input_stream *last_input_stream;
+};
+
+static int get_max_length(void *p, uint32_t index, int *del, void*userdata) {
+ struct memchunk chunk;
+ struct pass1_info *info = userdata;
+ struct input_stream*i = p;
+ assert(info && i);
+
+ if (memblockq_peek(i->memblockq, &chunk) != 0)
+ return 0;
+
+ assert(chunk.length);
+
+ if (info->maxlength > chunk.length)
+ info->maxlength = chunk.length;
+
+ info->count++;
+ info->last_input_stream = i;
+
+ return 0;
+}
+
+struct pass2_info {
+ struct memchunk *chunk;
+ struct sample_spec *spec;
+};
+
+static int do_mix(void *p, uint32_t index, int *del, void*userdata) {
+ struct memchunk chunk;
+ struct pass2_info *info = userdata;
+ struct input_stream*i = p;
+ assert(info && info->chunk && info->chunk->memblock && i && info->spec);
+
+ if (memblockq_peek(i->memblockq, &chunk) != 0)
+ return 0;
+
+ memblock_assert_exclusive(info->chunk->memblock);
+ assert(chunk.length && chunk.length <= info->chunk->memblock->length - info->chunk->index);
+
+ add_clip(info->chunk, &chunk, info->spec);
+ return 0;
+}
+
+int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *result) {
+ struct pass1_info pass1_info;
+ struct pass2_info pass2_info;
+ assert(s && target && result);
+ memblock_assert_exclusive(target);
+
+ /* Calculate how many bytes to mix */
+ pass1_info.maxlength = target->length;
+ pass1_info.count = 0;
+
+ idxset_foreach(s->input_streams, get_max_length, &pass1_info);
+ assert(pass1_info.maxlength);
+
+ /* No data to mix */
+ if (pass1_info.count == 0)
+ return -1;
+
+ /* A shortcut if only a single input stream is connected */
+ if (pass1_info.count == 1) {
+ struct input_stream *i = pass1_info.last_input_stream;
+ struct memchunk chunk;
+ size_t l;
+
+ assert(i);
+
+ if (memblockq_peek(i->memblockq, &chunk) != 0)
+ return -1;
+
+ l = target->length < chunk.length ? target->length : chunk.length;
+ memcpy(target->data, result->memblock+result->index, l);
+ target->length = l;
+ memblock_unref(chunk.memblock);
+ memblockq_drop(i->memblockq, l);
+
+ result->memblock = target;
+ result->length = l;
+ result->index = 0;
+ return 0;
+ }
+
+ /* Do the real mixing */
+ result->memblock = silence(target, &s->sample_spec);
+ result->index = 0;
+ result->length = pass1_info.maxlength;
+ pass2_info.chunk = result;
+ pass2_info.spec = &s->sample_spec;
+ idxset_foreach(s->input_streams, do_mix, &pass2_info);
+
+ assert(s->monitor_source);
+ source_post(s->monitor_source, result);
+
+ return 0;
+}
+
+int sink_render(struct sink*s, size_t length, struct memchunk *result) {
+ struct pass1_info pass1_info;
+ struct pass2_info pass2_info;
+ assert(s && result);
+
+ if (!length)
+ length = (size_t) -1;
+
+ /* Calculate how many bytes to mix */
+ pass1_info.maxlength = length;
+ pass1_info.count = 0;
+
+ idxset_foreach(s->input_streams, get_max_length, &pass1_info);
+ assert(pass1_info.maxlength);
+
+ /* No data to mix */
+ if (pass1_info.count == 0)
+ return -1;
+
+ if (pass1_info.count == 1) {
+ struct input_stream *i = pass1_info.last_input_stream;
+ size_t l;
+
+ assert(i);
+
+ if (memblockq_peek(i->memblockq, result) != 0)
+ return -1;
+
+ l = length < result->length ? length : result->length;
+ result->length = l;
+ memblockq_drop(i->memblockq, l);
+ return 0;
+ }
+
+ /* Do the mixing */
+ result->memblock = silence(memblock_new(result->length), &s->sample_spec);
+ result->index = 0;
+ result->length = pass1_info.maxlength;
+ pass2_info.chunk = result;
+ pass2_info.spec = &s->sample_spec;
+ idxset_foreach(s->input_streams, do_mix, &pass2_info);
+
+ assert(s->monitor_source);
+
+ source_post(s->monitor_source, result);
+ return 0;
+}
+
+void sink_notify(struct sink*s) {
+ assert(s);
+
+ if (s->notify_callback)
+ s->notify_callback(s, s->userdata);
+}
+
+void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata) {
+ assert(s && notify_callback);
+
+ s->notify_callback = notify_callback;
+ s->userdata = userdata;
+}
+
+
diff --git a/src/sink.h b/src/sink.h
new file mode 100644
index 00000000..a6f98005
--- /dev/null
+++ b/src/sink.h
@@ -0,0 +1,38 @@
+#ifndef foosinkhfoo
+#define foosinkhfoo
+
+struct sink;
+
+#include <inttypes.h>
+
+#include "core.h"
+#include "sample.h"
+#include "idxset.h"
+#include "source.h"
+
+struct sink {
+ char *name;
+ uint32_t index;
+
+ struct core *core;
+ struct sample_spec sample_spec;
+ struct idxset *input_streams;
+
+ struct source *monitor_source;
+
+ uint8_t volume;
+
+ void (*notify_callback)(struct sink*sink, void *userdata);
+ void *userdata;
+};
+
+struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec);
+void sink_free(struct sink* s);
+
+int sink_render(struct sink*s, size_t length, struct memchunk *result);
+int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *result);
+
+void sink_notify(struct sink*s);
+void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata);
+
+#endif
diff --git a/src/socket-server.c b/src/socket-server.c
new file mode 100644
index 00000000..2a1db9a0
--- /dev/null
+++ b/src/socket-server.c
@@ -0,0 +1,157 @@
+#include <stdlib.h>
+#include <assert.h>
+#include <errno.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include "socket-server.h"
+
+struct socket_server {
+ int fd;
+ char *filename;
+
+ void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata);
+ void *userdata;
+
+ struct mainloop_source *mainloop_source;
+};
+
+static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) {
+ struct socket_server *s = userdata;
+ struct iochannel *io;
+ int nfd;
+ assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s);
+
+ if ((nfd = accept(fd, NULL, NULL)) < 0) {
+ fprintf(stderr, "accept(): %s\n", strerror(errno));
+ return;
+ }
+
+ if (!s->on_connection) {
+ close(nfd);
+ return;
+ }
+
+ io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd);
+ assert(io);
+ s->on_connection(s, io, s->userdata);
+}
+
+struct socket_server* socket_server_new(struct mainloop *m, int fd) {
+ struct socket_server *s;
+ assert(m && fd >= 0);
+
+ s = malloc(sizeof(struct socket_server));
+ assert(s);
+ s->fd = fd;
+ s->filename = NULL;
+ s->on_connection = NULL;
+ s->userdata = NULL;
+
+ s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s);
+ assert(s->mainloop_source);
+
+ return s;
+}
+
+struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) {
+ int fd = -1;
+ struct sockaddr_un sa;
+ struct socket_server *s;
+
+ assert(m && filename);
+
+ if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) {
+ fprintf(stderr, "socket(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ sa.sun_family = AF_LOCAL;
+ strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1);
+ sa.sun_path[sizeof(sa.sun_path) - 1] = 0;
+
+ if (bind(fd, (struct sockaddr*) &sa, SUN_LEN(&sa)) < 0) {
+ fprintf(stderr, "bind(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ if (listen(fd, 5) < 0) {
+ fprintf(stderr, "listen(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ s = socket_server_new(m, fd);
+ assert(s);
+
+ s->filename = strdup(filename);
+ assert(s->filename);
+
+ return s;
+
+fail:
+ if (fd >= 0)
+ close(fd);
+
+ return NULL;
+}
+
+struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) {
+ int fd = -1;
+ struct sockaddr_in sa;
+
+ assert(m && port);
+
+ if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+ fprintf(stderr, "socket(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(port);
+ sa.sin_addr.s_addr = htonl(address);
+
+ if (bind(fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) {
+ fprintf(stderr, "bind(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ if (listen(fd, 5) < 0) {
+ fprintf(stderr, "listen(): %s\n", strerror(errno));
+ goto fail;
+ }
+
+ return socket_server_new(m, fd);
+
+fail:
+ if (fd >= 0)
+ close(fd);
+
+ return NULL;
+}
+
+void socket_server_free(struct socket_server*s) {
+ assert(s);
+ close(s->fd);
+
+ if (s->filename) {
+ unlink(s->filename);
+ free(s->filename);
+ }
+
+ mainloop_source_free(s->mainloop_source);
+
+ free(s);
+}
+
+void socket_server_set_callback(struct socket_server*s, void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata), void *userdata) {
+ assert(s);
+
+ s->on_connection = on_connection;
+ s->userdata = userdata;
+}
diff --git a/src/socket-server.h b/src/socket-server.h
new file mode 100644
index 00000000..4814fc62
--- /dev/null
+++ b/src/socket-server.h
@@ -0,0 +1,18 @@
+#ifndef foosocketserverhfoo
+#define foosocketserverhfoo
+
+#include <inttypes.h>
+#include "mainloop.h"
+#include "iochannel.h"
+
+struct socket_server;
+
+struct socket_server* socket_server_new(struct mainloop *m, int fd);
+struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename);
+struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port);
+
+void socket_server_free(struct socket_server*s);
+
+void socket_server_set_callback(struct socket_server*s, void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata), void *userdata);
+
+#endif
diff --git a/src/source.c b/src/source.c
new file mode 100644
index 00000000..2f34c461
--- /dev/null
+++ b/src/source.c
@@ -0,0 +1,58 @@
+#include <assert.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "source.h"
+#include "outputstream.h"
+
+struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) {
+ struct source *s;
+ int r;
+ assert(core && spec);
+
+ s = malloc(sizeof(struct source));
+ assert(s);
+
+ s->name = name ? strdup(name) : NULL;
+ r = idxset_put(core->sources, s, &s->index);
+ assert(s->index != IDXSET_INVALID && r >= 0);
+
+ s->core = core;
+ s->sample_spec = *spec;
+ s->output_streams = idxset_new(NULL, NULL);
+
+ s->link_change_callback = NULL;
+ s->userdata = NULL;
+
+ return s;
+}
+
+static void do_free(void *p, void *userdata) {
+ struct output_stream *o = p;
+ assert(o);
+ output_stream_free(o);
+};
+
+void source_free(struct source *s) {
+ assert(s);
+
+ idxset_remove_by_data(s->core->sources, s, NULL);
+ idxset_free(s->output_streams, do_free, NULL);
+ free(s->name);
+ free(s);
+}
+
+static int do_post(void *p, uint32_t index, int *del, void*userdata) {
+ struct memchunk *chunk = userdata;
+ struct output_stream *o = p;
+ assert(o && o->memblockq && index && del && chunk);
+
+ memblockq_push(o->memblockq, chunk, 0);
+ return 0;
+}
+
+void source_post(struct source*s, struct memchunk *chunk) {
+ assert(s && chunk);
+
+ idxset_foreach(s->output_streams, do_post, chunk);
+}
diff --git a/src/source.h b/src/source.h
new file mode 100644
index 00000000..3beb3f96
--- /dev/null
+++ b/src/source.h
@@ -0,0 +1,30 @@
+#ifndef foosourcehfoo
+#define foosourcehfoo
+
+struct source;
+
+#include <inttypes.h>
+#include "core.h"
+#include "sample.h"
+#include "idxset.h"
+#include "memblock.h"
+
+struct source {
+ char *name;
+ uint32_t index;
+
+ struct core *core;
+ struct sample_spec sample_spec;
+ struct idxset *output_streams;
+
+ void (*link_change_callback)(struct source*source, void *userdata);
+ void *userdata;
+};
+
+struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec);
+void source_free(struct source *s);
+
+/* Pass a new memory block to all output streams */
+void source_post(struct source*s, struct memchunk *b);
+
+#endif
diff --git a/src/strbuf.c b/src/strbuf.c
new file mode 100644
index 00000000..7c8b965d
--- /dev/null
+++ b/src/strbuf.c
@@ -0,0 +1,122 @@
+#ifndef foostrbufhfoo
+#define foostrbufhfoo
+
+#include <sys/types.h>
+#include <stdlib.h>
+#include <assert.h>
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+
+struct chunk {
+ struct chunk *next;
+ char text[];
+};
+
+struct strbuf {
+ size_t length;
+ struct chunk *head, *tail;
+};
+
+struct strbuf *strbuf_new(void) {
+ struct strbuf *sb = malloc(sizeof(struct strbuf));
+ assert(sb);
+ sb->length = 0;
+ sb->head = sb->tail = NULL;
+ return sb;
+}
+
+void strbuf_free(struct strbuf *sb) {
+ assert(sb);
+ while (sb->head) {
+ struct chunk *c = sb->head;
+ sb->head = sb->head->next;
+ free(c);
+ }
+
+ free(sb);
+}
+
+char *strbuf_tostring(struct strbuf *sb) {
+ char *t, *e;
+ struct chunk *c;
+ assert(sb);
+
+ t = malloc(sb->length+1);
+ assert(t);
+
+ e = t;
+ *e = 0;
+ for (c = sb->head; c; c = c->next) {
+ strcpy(e, c->text);
+ e = strchr(e, 0);
+ }
+
+ return t;
+}
+
+void strbuf_puts(struct strbuf *sb, const char *t) {
+ struct chunk *c;
+ size_t l;
+ assert(sb && t);
+
+ l = strlen(t);
+ c = malloc(sizeof(struct chunk)+l);
+ assert(c);
+
+ c->next = NULL;
+ strcpy(c->text, t);
+
+ if (sb->tail) {
+ assert(sb->head);
+ sb->tail->next = c;
+ } else {
+ assert(!sb->head);
+ sb->head = c;
+ }
+
+ sb->tail = c;
+ sb->length += l;
+}
+
+int strbuf_printf(struct strbuf *sb, const char *format, ...) {
+ int r, size = 100;
+ struct chunk *c = NULL;
+
+ assert(sb);
+
+ for(;;) {
+ va_list ap;
+
+ c = realloc(c, sizeof(struct chunk)+size);
+ assert(c);
+
+ va_start(ap, format);
+ r = vsnprintf(c->text, size, format, ap);
+ va_end(ap);
+
+ if (r > -1 && r < size) {
+ c->next = NULL;
+
+ if (sb->tail) {
+ assert(sb->head);
+ sb->tail->next = c;
+ } else {
+ assert(!sb->head);
+ sb->head = c;
+ }
+
+ sb->tail = c;
+ sb->length += r;
+
+ return r;
+ }
+
+ if (r > -1) /* glibc 2.1 */
+ size = r+1;
+ else /* glibc 2.0 */
+ size *= 2;
+ }
+}
+
+#endif
diff --git a/src/strbuf.h b/src/strbuf.h
new file mode 100644
index 00000000..6ad582a3
--- /dev/null
+++ b/src/strbuf.h
@@ -0,0 +1,13 @@
+#ifndef foostrbufhfoo
+#define foostrbufhfoo
+
+struct strbuf;
+
+struct strbuf *strbuf_new(void);
+void strbuf_free(struct strbuf *sb);
+char *strbuf_tostring(struct strbuf *sb);
+
+int strbuf_printf(struct strbuf *sb, const char *format, ...);
+void strbuf_puts(struct strbuf *sb, const char *t);
+
+#endif