From 9cb0b933e260008c6a03e24a4a149f726b8d86b2 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 8 Jun 2004 23:54:24 +0000 Subject: initial commit git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@3 fefdeb5f-60dc-0310-8127-8f9354f1896f --- src/Makefile | 10 ++ src/client.c | 32 ++++ src/client.h | 21 +++ src/core.c | 81 ++++++++++ src/core.h | 21 +++ src/idxset.c | 329 +++++++++++++++++++++++++++++++++++++++++ src/idxset.h | 28 ++++ src/inputstream.c | 50 +++++++ src/inputstream.h | 25 ++++ src/iochannel.c | 158 ++++++++++++++++++++ src/iochannel.h | 20 +++ src/main.c | 26 ++++ src/mainloop.c | 331 +++++++++++++++++++++++++++++++++++++++++ src/mainloop.h | 38 +++++ src/memblock.c | 67 +++++++++ src/memblock.h | 31 ++++ src/memblockq.c | 156 ++++++++++++++++++++ src/memblockq.h | 24 +++ src/module.c | 98 +++++++++++++ src/module.h | 27 ++++ src/oss.c | 30 ++++ src/outputstream.c | 41 ++++++ src/outputstream.h | 22 +++ src/packet.c | 29 ++++ src/packet.h | 18 +++ src/protocol-native-tcp.c | 19 +++ src/protocol-native-unix.c | 27 ++++ src/protocol-native.c | 49 +++++++ src/protocol-native.h | 9 ++ src/protocol-simple-tcp.c | 24 +++ src/protocol-simple.c | 173 ++++++++++++++++++++++ src/protocol-simple.h | 17 +++ src/pstream.c | 359 +++++++++++++++++++++++++++++++++++++++++++++ src/pstream.h | 22 +++ src/queue.c | 77 ++++++++++ src/queue.h | 13 ++ src/sample.c | 80 ++++++++++ src/sample.h | 35 +++++ src/sink-pipe.c | 155 +++++++++++++++++++ src/sink.c | 217 +++++++++++++++++++++++++++ src/sink.h | 38 +++++ src/socket-server.c | 157 ++++++++++++++++++++ src/socket-server.h | 18 +++ src/source.c | 58 ++++++++ src/source.h | 30 ++++ src/strbuf.c | 122 +++++++++++++++ src/strbuf.h | 13 ++ 47 files changed, 3425 insertions(+) create mode 100644 src/Makefile create mode 100644 src/client.c create mode 100644 src/client.h create mode 100644 src/core.c create mode 100644 src/core.h create mode 100644 src/idxset.c create mode 100644 src/idxset.h create mode 100644 src/inputstream.c create mode 100644 src/inputstream.h create mode 100644 src/iochannel.c create mode 100644 src/iochannel.h create mode 100644 src/main.c create mode 100644 src/mainloop.c create mode 100644 src/mainloop.h create mode 100644 src/memblock.c create mode 100644 src/memblock.h create mode 100644 src/memblockq.c create mode 100644 src/memblockq.h create mode 100644 src/module.c create mode 100644 src/module.h create mode 100644 src/oss.c create mode 100644 src/outputstream.c create mode 100644 src/outputstream.h create mode 100644 src/packet.c create mode 100644 src/packet.h create mode 100644 src/protocol-native-tcp.c create mode 100644 src/protocol-native-unix.c create mode 100644 src/protocol-native.c create mode 100644 src/protocol-native.h create mode 100644 src/protocol-simple-tcp.c create mode 100644 src/protocol-simple.c create mode 100644 src/protocol-simple.h create mode 100644 src/pstream.c create mode 100644 src/pstream.h create mode 100644 src/queue.c create mode 100644 src/queue.h create mode 100644 src/sample.c create mode 100644 src/sample.h create mode 100644 src/sink-pipe.c create mode 100644 src/sink.c create mode 100644 src/sink.h create mode 100644 src/socket-server.c create mode 100644 src/socket-server.h create mode 100644 src/source.c create mode 100644 src/source.h create mode 100644 src/strbuf.c create mode 100644 src/strbuf.h (limited to 'src') diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 00000000..366e84e6 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,10 @@ +CFLAGS=-Wall -pipe -ansi -D_GNU_SOURCE + +all: idxset.o queue.o strbuf.o mainloop.o iochannel.o packet.o \ + memblock.o sample.o socket-server.o memblockq.o client.o \ + core.o main.o outputstream.o inputstream.o source.o sink.o \ + pstream.o protocol-simple.o protocol-simple-tcp.o sink-pipe.o \ + module.o + +clean: + rm -f *.o diff --git a/src/client.c b/src/client.c new file mode 100644 index 00000000..56d85734 --- /dev/null +++ b/src/client.c @@ -0,0 +1,32 @@ +#include +#include +#include + +#include "client.h" + +struct client *client_new(struct core *core, const char *protocol_name, char *name) { + struct client *c; + int r; + assert(core); + + c = malloc(sizeof(struct client)); + assert(c); + c->protocol_name = protocol_name; + c->name = name ? strdup(name) : NULL; + c->kill = NULL; + c->userdata = NULL; + c->core = core; + + r = idxset_put(core->clients, c, &c->index); + assert(c->index != IDXSET_INVALID && r >= 0); + + return c; +} + +void client_free(struct client *c) { + assert(c && c->core); + + idxset_remove_by_data(c->core->clients, c, NULL); + free(c->name); + free(c); +} diff --git a/src/client.h b/src/client.h new file mode 100644 index 00000000..7128a452 --- /dev/null +++ b/src/client.h @@ -0,0 +1,21 @@ +#ifndef fooclienthfoo +#define fooclienthfoo + +#include "core.h" + +struct client { + char *name; + uint32_t index; + + const char *protocol_name; + + void *userdata; + void (*kill)(struct client *c); + + struct core *core; +}; + +struct client *client_new(struct core *c, const char *protocol_name, char *name); +void client_free(struct client *c); + +#endif diff --git a/src/core.c b/src/core.c new file mode 100644 index 00000000..7cfa66e3 --- /dev/null +++ b/src/core.c @@ -0,0 +1,81 @@ +#include +#include +#include + +#include "core.h" +#include "module.h" +#include "sink.h" +#include "source.h" + +struct core* core_new(struct mainloop *m) { + struct core* c; + c = malloc(sizeof(struct core)); + assert(c); + + c->mainloop = m; + c->clients = idxset_new(NULL, NULL); + c->sinks = idxset_new(NULL, NULL); + c->sources = idxset_new(NULL, NULL); + c->output_streams = idxset_new(NULL, NULL); + c->input_streams = idxset_new(NULL, NULL); + + c->default_source_index = c->default_sink_index = IDXSET_INVALID; + + c->modules = NULL; + + return c; +}; + +void core_free(struct core *c) { + assert(c); + + module_unload_all(c); + assert(!c->modules); + + assert(idxset_isempty(c->clients)); + idxset_free(c->clients, NULL, NULL); + + assert(idxset_isempty(c->sinks)); + idxset_free(c->sinks, NULL, NULL); + + assert(idxset_isempty(c->sources)); + idxset_free(c->sources, NULL, NULL); + + assert(idxset_isempty(c->output_streams)); + idxset_free(c->output_streams, NULL, NULL); + + assert(idxset_isempty(c->input_streams)); + idxset_free(c->input_streams, NULL, NULL); + + free(c); +}; + +struct sink* core_get_default_sink(struct core *c) { + struct sink *sink; + assert(c); + + if ((sink = idxset_get_by_index(c->sinks, c->default_sink_index))) + return sink; + + if (!(sink = idxset_rrobin(c->sinks, NULL))) + return NULL; + + fprintf(stderr, "Default sink vanished, setting to %u\n", sink->index); + c->default_sink_index = sink->index; + return sink; +} + +struct source* core_get_default_source(struct core *c) { + struct source *source; + assert(c); + + if ((source = idxset_get_by_index(c->sources, c->default_source_index))) + return source; + + if (!(source = idxset_rrobin(c->sources, NULL))) + return NULL; + + fprintf(stderr, "Default source vanished, setting to %u\n", source->index); + c->default_source_index = source->index; + return source; +} diff --git a/src/core.h b/src/core.h new file mode 100644 index 00000000..649c9dba --- /dev/null +++ b/src/core.h @@ -0,0 +1,21 @@ +#ifndef foocorehfoo +#define foocorehfoo + +#include "idxset.h" +#include "mainloop.h" + +struct core { + struct mainloop *mainloop; + + struct idxset *clients, *sinks, *sources, *output_streams, *input_streams, *modules; + + uint32_t default_source_index, default_sink_index; +}; + +struct core* core_new(struct mainloop *m); +void core_free(struct core*c); + +struct sink* core_get_default_sink(struct core *c); +struct source* core_get_default_source(struct core *c); + +#endif diff --git a/src/idxset.c b/src/idxset.c new file mode 100644 index 00000000..eaea34f4 --- /dev/null +++ b/src/idxset.c @@ -0,0 +1,329 @@ +#include +#include +#include + +#include "idxset.h" + +struct idxset_entry { + void *data; + uint32_t index; + unsigned hash_value; + + struct idxset_entry *hash_prev, *hash_next; + struct idxset_entry* iterate_prev, *iterate_next; +}; + +struct idxset { + unsigned (*hash_func) (void *p); + int (*compare_func)(void *a, void *b); + + unsigned hash_table_size, n_entries; + struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail, *rrobin; + uint32_t index, start_index, array_size; +}; + +static unsigned trivial_hash_func(void *p) { + return (unsigned) p; +} + +static int trivial_compare_func(void *a, void *b) { + return !(a == b); +} + +struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) (void*a, void*b)) { + struct idxset *s; + + s = malloc(sizeof(struct idxset)); + assert(s); + s->hash_func = hash_func ? hash_func : trivial_hash_func; + s->compare_func = compare_func ? compare_func : trivial_compare_func; + s->hash_table_size = 1023; + s->hash_table = malloc(sizeof(struct idxset_entry*)*s->hash_table_size); + assert(s->hash_table); + s->array = NULL; + s->array_size = 0; + s->index = 0; + s->start_index = 0; + s->n_entries = 0; + + s->iterate_list_head = s->iterate_list_tail = NULL; + + return s; +} + +void idxset_free(struct idxset *s, void (*free_func) (void *p, void *userdata), void *userdata) { + assert(s); + + if (free_func) { + while (s->iterate_list_head) { + struct idxset_entry *e = s->iterate_list_head; + s->iterate_list_head = s->iterate_list_head->iterate_next; + + if (free_func) + free_func(e->data, userdata); + free(e); + } + } + + free(s->hash_table); + free(s->array); + free(s); +} + +static struct idxset_entry* hash_scan(struct idxset *s, struct idxset_entry* e, void *p) { + assert(p); + + assert(s->compare_func); + for (; e; e = e->hash_next) + if (s->compare_func(e->data, p)) + return e; + + return NULL; +} + +static void extend_array(struct idxset *s, uint32_t index) { + uint32_t i, j, l; + struct idxset_entry** n; + assert(index >= s->start_index ); + + if (index <= s->start_index + s->array_size) + return; + + for (i = 0; i < s->array_size; i++) + if (s->array[i]) + break; + + l = index - s->start_index - i + 100; + n = malloc(sizeof(struct hash_table_entry*)*l); + assert(n); + memset(n, 0, sizeof(struct hash_table_entry*)*l); + + for (j = 0; j < s->array_size-i; j++) + n[j] = s->array[i+j]; + + free(s->array); + + s->array = n; + s->array_size = l; + s->start_index += i; +} + +static struct idxset_entry** array_index(struct idxset*s, uint32_t index) { + + if (index >= s->start_index + s->array_size) + return NULL; + + if (index < s->start_index) + return NULL; + + return s->array + (index - s->start_index); +} + +int idxset_put(struct idxset*s, void *p, uint32_t *index) { + unsigned h; + struct idxset_entry *e, **a; + assert(s && p); + + assert(s->hash_func); + h = s->hash_func(p) % s->hash_table_size; + + assert(s->hash_table); + if ((e = hash_scan(s, s->hash_table[h], p))) { + if (index) + *index = e->index; + + return -1; + } + + e = malloc(sizeof(struct idxset_entry)); + assert(e); + + e->data = p; + e->index = s->index++; + e->hash_value = h; + + /* Insert into hash table */ + e->hash_next = s->hash_table[h]; + e->hash_prev = NULL; + if (s->hash_table[h]) + s->hash_table[h]->hash_prev = e; + s->hash_table[h] = e; + + /* Insert into array */ + extend_array(s, s->index); + a = array_index(s, s->index); + assert(a && !*a); + *a = e; + + /* Insert into linked list */ + e->iterate_next = NULL; + e->iterate_prev = s->iterate_list_tail; + if (s->iterate_list_tail) { + assert(s->iterate_list_head); + s->iterate_list_tail->iterate_next = e; + } else { + assert(!s->iterate_list_head); + s->iterate_list_head = e; + } + s->iterate_list_tail = e; + + s->n_entries++; + assert(s->n_entries >= 1); + + if (index) + *index = e->index; + + return 0; +} + +void* idxset_get_by_index(struct idxset*s, uint32_t index) { + struct idxset_entry **a; + assert(s); + + if (!(a = array_index(s, index))) + return NULL; + + return (*a)->data; +} + +void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index) { + unsigned h; + struct idxset_entry *e; + assert(s && p); + + assert(s->hash_func); + h = s->hash_func(p) % s->hash_table_size; + + assert(s->hash_table); + if (!(e = hash_scan(s, s->hash_table[h], p))) + return NULL; + + if (index) + *index = e->index; + + return e->data; +} + +static void remove_entry(struct idxset *s, struct idxset_entry *e) { + struct idxset_entry **a; + assert(s && e); + + /* Remove from array */ + a = array_index(s, s->index); + assert(a && *a == e); + *a = NULL; + + /* Remove from linked list */ + if (e->iterate_next) + e->iterate_next->iterate_prev = e->iterate_prev; + else + s->iterate_list_tail = e->iterate_prev; + + if (e->iterate_prev) + e->iterate_prev->iterate_next = e->iterate_next; + else + s->iterate_list_head = e->iterate_next; + + /* Remove from hash table */ + if (e->hash_next) + e->hash_next->hash_prev = e->hash_prev; + + if (e->hash_prev) + e->hash_prev->hash_next = e->hash_next; + else + s->hash_table[e->hash_value] = e->hash_next; + + if (s->rrobin == e) + s->rrobin = NULL; + + free(e); + + assert(s->n_entries >= 1); + s->n_entries--; +} + +void* idxset_remove_by_index(struct idxset*s, uint32_t index) { + struct idxset_entry **a; + void *data; + + assert(s); + + if (!(a = array_index(s, index))) + return NULL; + + data = (*a)->data; + remove_entry(s, *a); + + return data; +} + +void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) { + struct idxset_entry *e; + unsigned h; + + assert(s->hash_func); + h = s->hash_func(data) % s->hash_table_size; + + assert(s->hash_table); + if (!(e = hash_scan(s, s->hash_table[h], data))) + return NULL; + + data = e->data; + if (index) + *index = e->index; + + remove_entry(s, e); + + return data; +} + +void* idxset_rrobin(struct idxset *s, uint32_t *index) { + assert(s && index); + + if (s->rrobin) + s->rrobin = s->rrobin->iterate_next; + + if (!s->rrobin) + s->rrobin = s->iterate_list_head; + + if (!s->rrobin) + return NULL; + + if (index) + *index = s->rrobin->index; + + return s->rrobin->data; +} + +int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata) { + struct idxset_entry *e; + assert(s && func); + + e = s->iterate_list_head; + while (e) { + int del = 0, r; + struct idxset_entry *n = e->iterate_next; + + r = func(e->data, e->index, &del, userdata); + + if (del) + remove_entry(s, e); + + if (r < 0) + return r; + + e = n; + } + + return 0; +} + +unsigned idxset_ncontents(struct idxset*s) { + assert(s); + return s->n_entries; +} + +int idxset_isempty(struct idxset *s) { + assert(s); + return s->n_entries == 0; +} diff --git a/src/idxset.h b/src/idxset.h new file mode 100644 index 00000000..f649e23e --- /dev/null +++ b/src/idxset.h @@ -0,0 +1,28 @@ +#ifndef fooidxsethfoo +#define fooidxsethfoo + +#include + +#define IDXSET_INVALID ((uint32_t) -1) + +struct idxset; + +struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) (void*a, void*b)); +void idxset_free(struct idxset *s, void (*free_func) (void *p, void *userdata), void *userdata); + +int idxset_put(struct idxset*s, void *p, uint32_t *index); + +void* idxset_get_by_index(struct idxset*s, uint32_t index); +void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index); + +void* idxset_remove_by_index(struct idxset*s, uint32_t index); +void* idxset_remove_by_data(struct idxset*s, void *p, uint32_t *index); + +void* idxset_rrobin(struct idxset *s, uint32_t *index); + +int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata); + +unsigned idxset_ncontents(struct idxset*s); +int idxset_isempty(struct idxset *s); + +#endif diff --git a/src/inputstream.c b/src/inputstream.c new file mode 100644 index 00000000..c7b4b4c7 --- /dev/null +++ b/src/inputstream.c @@ -0,0 +1,50 @@ +#include +#include +#include + +#include "inputstream.h" + +struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name) { + struct input_stream *i; + int r; + assert(s && spec); + + i = malloc(sizeof(struct input_stream)); + assert(i); + i->name = name ? strdup(name) : NULL; + i->sink = s; + i->spec = *spec; + + i->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec)); + assert(i->memblockq); + + assert(s->core); + r = idxset_put(s->core->input_streams, i, &i->index); + assert(r == 0 && i->index != IDXSET_INVALID); + r = idxset_put(s->input_streams, i, NULL); + assert(r == 0); + + return i; +} + +void input_stream_free(struct input_stream* i) { + assert(i); + + memblockq_free(i->memblockq); + + assert(i->sink && i->sink->core); + idxset_remove_by_data(i->sink->core->input_streams, i, NULL); + idxset_remove_by_data(i->sink->input_streams, i, NULL); + + free(i->name); + free(i); +} + +void input_stream_notify(struct input_stream *i) { + assert(i); + + if (memblockq_is_empty(i->memblockq)) + return; + + sink_notify(i->sink); +} diff --git a/src/inputstream.h b/src/inputstream.h new file mode 100644 index 00000000..0353799e --- /dev/null +++ b/src/inputstream.h @@ -0,0 +1,25 @@ +#ifndef fooinputstreamhfoo +#define fooinputstreamhfoo + +#include + +#include "sink.h" +#include "sample.h" +#include "memblockq.h" + +struct input_stream { + char *name; + uint32_t index; + + struct sink *sink; + struct sample_spec spec; + + struct memblockq *memblockq; +}; + +struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name); +void input_stream_free(struct input_stream* i); + +void input_stream_notify(struct input_stream *i); + +#endif diff --git a/src/iochannel.c b/src/iochannel.c new file mode 100644 index 00000000..db9717a9 --- /dev/null +++ b/src/iochannel.c @@ -0,0 +1,158 @@ +#include +#include +#include +#include + +#include "iochannel.h" + +struct iochannel { + int ifd, ofd; + struct mainloop* mainloop; + + void (*callback)(struct iochannel*io, void *userdata); + void*userdata; + + int readable; + int writable; + + struct mainloop_source* input_source, *output_source; +}; + +static void enable_mainloop_sources(struct iochannel *io) { + assert(io); + + if (io->input_source == io->output_source) { + enum mainloop_io_event e = MAINLOOP_IO_EVENT_NULL; + assert(io->input_source); + + if (!io->readable) + e |= MAINLOOP_IO_EVENT_IN; + if (!io->writable) + e |= MAINLOOP_IO_EVENT_OUT; + + mainloop_source_io_set_events(io->input_source, e); + } else { + if (io->input_source) + mainloop_source_io_set_events(io->input_source, io->readable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_IN); + if (io->output_source) + mainloop_source_io_set_events(io->output_source, io->writable ? MAINLOOP_IO_EVENT_NULL : MAINLOOP_IO_EVENT_OUT); + } +} + +static void callback(struct mainloop_source*s, int fd, enum mainloop_io_event events, void *userdata) { + struct iochannel *io = userdata; + int changed; + assert(s && fd >= 0 && userdata); + + if (events & MAINLOOP_IO_EVENT_IN && !io->readable) { + io->readable = 1; + changed = 1; + } + + if (events & MAINLOOP_IO_EVENT_OUT && !io->writable) { + io->writable = 1; + changed = 1; + } + + if (changed) { + enable_mainloop_sources(io); + + if (io->callback) + io->callback(io, io->userdata); + } +} + +static void make_nonblock_fd(int fd) { + int v; + + if ((v = fcntl(fd, F_GETFL)) >= 0) + if (!(v & O_NONBLOCK)) + fcntl(fd, F_SETFL, v|O_NONBLOCK); +} + +struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd) { + struct iochannel *io; + assert(m && (ifd >= 0 || ofd >= 0)); + + io = malloc(sizeof(struct iochannel)); + io->ifd = ifd; + io->ofd = ofd; + io->mainloop = m; + + io->userdata = NULL; + io->callback = NULL; + io->readable = 0; + io->writable = 0; + + if (ifd == ofd) { + assert(ifd >= 0); + make_nonblock_fd(io->ifd); + io->input_source = io->output_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN|MAINLOOP_IO_EVENT_OUT, callback, io); + } else { + + if (ifd >= 0) { + make_nonblock_fd(io->ifd); + io->input_source = mainloop_source_new_io(m, ifd, MAINLOOP_IO_EVENT_IN, callback, io); + } else + io->input_source = NULL; + + if (ofd >= 0) { + make_nonblock_fd(io->ofd); + io->output_source = mainloop_source_new_io(m, ofd, MAINLOOP_IO_EVENT_OUT, callback, io); + } else + io->output_source = NULL; + } + + return io; +} + +void iochannel_free(struct iochannel*io) { + assert(io); + + if (io->ifd >= 0) + close(io->ifd); + if (io->ofd >= 0 && io->ofd != io->ifd) + close(io->ofd); + + if (io->input_source) + mainloop_source_free(io->input_source); + if (io->output_source) + mainloop_source_free(io->output_source); + + free(io); +} + +int iochannel_is_readable(struct iochannel*io) { + assert(io); + return io->readable; +} + +int iochannel_is_writable(struct iochannel*io) { + assert(io); + return io->writable; +} + +ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l) { + ssize_t r; + assert(io && data && l && io->ofd >= 0); + + if ((r = write(io->ofd, data, l)) >= 0) { + io->writable = 0; + enable_mainloop_sources(io); + } + + return r; +} + +ssize_t iochannel_read(struct iochannel*io, void*data, size_t l) { + ssize_t r; + + assert(io && data && l && io->ifd >= 0); + + if ((r = read(io->ifd, data, l)) >= 0) { + io->readable = 0; + enable_mainloop_sources(io); + } + + return r; +} diff --git a/src/iochannel.h b/src/iochannel.h new file mode 100644 index 00000000..f97fabba --- /dev/null +++ b/src/iochannel.h @@ -0,0 +1,20 @@ +#ifndef fooiochannelhfoo +#define fooiochannelhfoo + +#include +#include "mainloop.h" + +struct iochannel; + +struct iochannel* iochannel_new(struct mainloop*m, int ifd, int ofd); +void iochannel_free(struct iochannel*io); + +ssize_t iochannel_write(struct iochannel*io, const void*data, size_t l); +ssize_t iochannel_read(struct iochannel*io, void*data, size_t l); + +int iochannel_is_readable(struct iochannel*io); +int iochannel_is_writable(struct iochannel*io); + +void iochannel_set_callback(struct iochannel*io, void (*callback)(struct iochannel*io, void *userdata), void *userdata); + +#endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 00000000..3104c264 --- /dev/null +++ b/src/main.c @@ -0,0 +1,26 @@ +#include +#include + +#include "core.h" +#include "mainloop.h" +#include "module.h" + +int main(int argc, char *argv[]) { + struct mainloop *m; + struct core *c; + + m = mainloop_new(); + assert(m); + c = core_new(m); + assert(c); + + module_load(c, "sink-pipe", NULL); + module_load(c, "protocol-simple-tcp", NULL); + + mainloop_run(m); + + core_free(c); + mainloop_free(m); + + return 0; +} diff --git a/src/mainloop.c b/src/mainloop.c new file mode 100644 index 00000000..d043ce90 --- /dev/null +++ b/src/mainloop.c @@ -0,0 +1,331 @@ +#include +#include +#include +#include + +#include "mainloop.h" + +struct mainloop_source { + struct mainloop_source *next; + struct mainloop *mainloop; + enum mainloop_source_type type; + + int enabled; + int dead; + void *userdata; + + struct { + int fd; + enum mainloop_io_event events; + void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata); + struct pollfd pollfd; + } io; + + struct { + void (*callback)(struct mainloop_source*s, void *userdata); + } prepare; + + struct { + void (*callback)(struct mainloop_source*s, void *userdata); + } idle; +}; + +struct mainloop_source_list { + struct mainloop_source *sources; + int n_sources; + int dead_sources; +}; + +struct mainloop { + struct mainloop_source_list io_sources, prepare_sources, idle_sources; + + struct pollfd *pollfds; + int max_pollfds, n_pollfds; + int rebuild_pollfds; + + int quit; + int running; +}; + +struct mainloop *mainloop_new(void) { + struct mainloop *m; + + m = malloc(sizeof(struct mainloop)); + assert(m); + memset(m, 0, sizeof(struct mainloop)); + + return m; +} + +static void free_sources(struct mainloop_source_list *l, int all) { + struct mainloop_source *s, *p; + assert(l); + + if (!l->dead_sources) + return; + + p = NULL; + s = l->sources; + while (s) { + if (all || s->dead) { + struct mainloop_source *t = s; + s = s->next; + + if (p) + p->next = s; + else + l->sources = s; + + free(t); + } else { + p = s; + s = s->next; + } + } + + l->dead_sources = 0; + + if (all) { + assert(l->sources); + l->n_sources = 0; + } +} + +void mainloop_free(struct mainloop* m) { + assert(m); + free_sources(&m->io_sources, 1); + free_sources(&m->prepare_sources, 1); + free_sources(&m->idle_sources, 1); + free(m->pollfds); +} + +static void rebuild_pollfds(struct mainloop *m) { + struct mainloop_source*s; + struct pollfd *p; + + if (m->max_pollfds < m->io_sources.n_sources) { + m->max_pollfds = m->io_sources.n_sources*2; + m->pollfds = realloc(m->pollfds, sizeof(struct pollfd)*m->max_pollfds); + } + + m->n_pollfds = 0; + p = m->pollfds; + for (s = m->io_sources.sources; s; s = s->next) { + assert(s->type == MAINLOOP_SOURCE_TYPE_IO); + if (!s->dead && s->enabled && s->io.events != MAINLOOP_IO_EVENT_NULL) { + *(p++) = s->io.pollfd; + m->n_pollfds++; + } + } +} + +static void dispatch_pollfds(struct mainloop *m) { + int i; + struct pollfd *p; + struct mainloop_source *s; + /* This loop assumes that m->sources and m->pollfds have the same + * order and that m->pollfds is a subset of m->sources! */ + + s = m->io_sources.sources; + for (p = m->pollfds, i = 0; i < m->n_pollfds; p++, i++) { + for (;;) { + assert(s && s->type == MAINLOOP_SOURCE_TYPE_IO); + + if (p->fd == s->io.fd) { + if (!s->dead && s->enabled) { + enum mainloop_io_event e = (p->revents & POLLIN ? MAINLOOP_IO_EVENT_IN : 0) | (p->revents & POLLOUT ? MAINLOOP_IO_EVENT_OUT : 0); + if (e) { + assert(s->io.callback); + s->io.callback(s, s->io.fd, e, s->userdata); + } + } + + break; + } + s = s->next; + } + } +} + +int mainloop_iterate(struct mainloop *m, int block) { + struct mainloop_source *s; + int c; + assert(m && !m->running); + + if(m->quit) + return m->quit; + + free_sources(&m->io_sources, 0); + free_sources(&m->prepare_sources, 0); + free_sources(&m->idle_sources, 0); + + for (s = m->prepare_sources.sources; s; s = s->next) { + assert(!s->dead && s->type == MAINLOOP_SOURCE_TYPE_PREPARE); + if (s->enabled) { + assert(s->prepare.callback); + s->prepare.callback(s, s->userdata); + } + } + + if (m->rebuild_pollfds) + rebuild_pollfds(m); + + m->running = 1; + + if ((c = poll(m->pollfds, m->n_pollfds, (block && !m->idle_sources.n_sources) ? -1 : 0)) > 0) + dispatch_pollfds(m); + else if (c == 0) { + for (s = m->idle_sources.sources; s; s = s->next) { + assert(!s->dead && s->type == MAINLOOP_SOURCE_TYPE_IDLE); + if (s->enabled) { + assert(s->idle.callback); + s->idle.callback(s, s->userdata); + } + } + } + + m->running = 0; + return c < 0 ? -1 : 0; +} + +int mainloop_run(struct mainloop *m) { + int r; + while (!(r = mainloop_iterate(m, 1))); + return r; +} + +void mainloop_quit(struct mainloop *m, int r) { + assert(m); + m->quit = r; +} + +static struct mainloop_source_list* get_source_list(struct mainloop *m, enum mainloop_source_type type) { + struct mainloop_source_list *l; + + switch(type) { + case MAINLOOP_SOURCE_TYPE_IO: + l = &m->io_sources; + break; + case MAINLOOP_SOURCE_TYPE_PREPARE: + l = &m->prepare_sources; + break; + case MAINLOOP_SOURCE_TYPE_IDLE: + l = &m->idle_sources; + break; + default: + l = NULL; + break; + } + + return l; +} + +static struct mainloop_source *source_new(struct mainloop*m, enum mainloop_source_type type) { + struct mainloop_source_list *l; + struct mainloop_source* s; + assert(m); + + s = malloc(sizeof(struct mainloop_source)); + assert(s); + memset(s, 0, sizeof(struct mainloop_source)); + + s->type = type; + s->mainloop = m; + + l = get_source_list(m, type); + assert(l); + + s->next = l->sources; + l->sources = s; + l->n_sources++; + return s; +} + +struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata) { + struct mainloop_source* s; + assert(m && fd>=0 && callback); + + s = source_new(m, MAINLOOP_SOURCE_TYPE_IO); + + s->io.fd = fd; + s->io.events = event; + s->io.callback = callback; + s->userdata = userdata; + s->io.pollfd.fd = fd; + s->io.pollfd.events = (event & MAINLOOP_IO_EVENT_IN ? POLLIN : 0) | (event & MAINLOOP_IO_EVENT_OUT ? POLLOUT : 0); + s->io.pollfd.revents = 0; + + s->enabled = 1; + + m->rebuild_pollfds = 1; + return s; +} + +struct mainloop_source* mainloop_source_new_prepare(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) { + struct mainloop_source* s; + assert(m && callback); + + s = source_new(m, MAINLOOP_SOURCE_TYPE_PREPARE); + + s->prepare.callback = callback; + s->userdata = userdata; + s->enabled = 1; + return s; +} + +struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata) { + struct mainloop_source* s; + assert(m && callback); + + s = source_new(m, MAINLOOP_SOURCE_TYPE_IDLE); + + s->prepare.callback = callback; + s->userdata = userdata; + s->enabled = 1; + return s; +} + +void mainloop_source_free(struct mainloop_source*s) { + struct mainloop_source_list *l; + assert(s && !s->dead); + s->dead = 1; + + assert(s->mainloop); + l = get_source_list(s->mainloop, s->type); + assert(l); + + l->n_sources--; + l->dead_sources = 1; + + if (s->type == MAINLOOP_SOURCE_TYPE_IO) + s->mainloop->rebuild_pollfds = 1; +} + +void mainloop_source_enable(struct mainloop_source*s, int b) { + assert(s && !s->dead); + + if (s->type == MAINLOOP_SOURCE_TYPE_IO && ((s->enabled && !b) || (!s->enabled && b))) { + assert(s->mainloop); + s->mainloop->rebuild_pollfds = 1; + } + + s->enabled = b; +} + +void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event events) { + assert(s && !s->dead && s->type == MAINLOOP_SOURCE_TYPE_IO); + + if ((s->io.events && !events) || (!s->io.events && events)) { + assert(s->mainloop); + s->mainloop->rebuild_pollfds = 1; + } + + s->io.events = events; + s->io.pollfd.events = ((events & MAINLOOP_IO_EVENT_IN) ? POLLIN : 0) | ((events & MAINLOOP_IO_EVENT_OUT) ? POLLOUT : 0); +} + +struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s) { + assert(s); + + return s->mainloop; +} diff --git a/src/mainloop.h b/src/mainloop.h new file mode 100644 index 00000000..72376c72 --- /dev/null +++ b/src/mainloop.h @@ -0,0 +1,38 @@ +#ifndef foomainloophfoo +#define foomainloophfoo + +struct mainloop; +struct mainloop_source; + +enum mainloop_io_event { + MAINLOOP_IO_EVENT_NULL = 0, + MAINLOOP_IO_EVENT_IN = 1, + MAINLOOP_IO_EVENT_OUT = 2, + MAINLOOP_IO_EVENT_BOTH = 3 +}; + +enum mainloop_source_type { + MAINLOOP_SOURCE_TYPE_IO, + MAINLOOP_SOURCE_TYPE_PREPARE, + MAINLOOP_SOURCE_TYPE_IDLE +}; + +struct mainloop *mainloop_new(void); +void mainloop_free(struct mainloop* m); + +int mainloop_iterate(struct mainloop *m, int block); +int mainloop_run(struct mainloop *m); +void mainloop_quit(struct mainloop *m, int r); + +struct mainloop_source* mainloop_source_new_io(struct mainloop*m, int fd, enum mainloop_io_event event, void (*callback)(struct mainloop_source*s, int fd, enum mainloop_io_event event, void *userdata), void *userdata); +struct mainloop_source* mainloop_source_new_prepare(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata); +struct mainloop_source* mainloop_source_new_idle(struct mainloop*m, void (*callback)(struct mainloop_source *s, void*userdata), void*userdata); + +void mainloop_source_free(struct mainloop_source*s); +void mainloop_source_enable(struct mainloop_source*s, int b); + +void mainloop_source_io_set_events(struct mainloop_source*s, enum mainloop_io_event event); + +struct mainloop *mainloop_source_get_mainloop(struct mainloop_source *s); + +#endif diff --git a/src/memblock.c b/src/memblock.c new file mode 100644 index 00000000..3bef4944 --- /dev/null +++ b/src/memblock.c @@ -0,0 +1,67 @@ +#include +#include +#include + +#include "memblock.h" + +struct memblock *memblock_new(size_t length) { + struct memblock *b = malloc(sizeof(struct memblock)+length); + b->type = MEMBLOCK_APPENDED; + b->ref = 1; + b->length = length; + b->data = b+1; + return b; +} + +struct memblock *memblock_new_fixed(void *d, size_t length) { + struct memblock *b = malloc(sizeof(struct memblock)); + b->type = MEMBLOCK_FIXED; + b->ref = 1; + b->length = length; + b->data = d; + return b; +} + +struct memblock *memblock_new_dynamic(void *d, size_t length) { + struct memblock *b = malloc(sizeof(struct memblock)); + b->type = MEMBLOCK_DYNAMIC; + b->ref = 1; + b->length = length; + b->data = d; + return b; +} + +struct memblock* memblock_ref(struct memblock*b) { + assert(b && b->ref >= 1); + b->ref++; + return b; +} + +void memblock_unref(struct memblock*b) { + assert(b && b->ref >= 1); + b->ref--; + + if (b->ref == 0) { + if (b->type == MEMBLOCK_DYNAMIC) + free(b->data); + free(b); + } +} + +void memblock_unref_fixed(struct memblock *b) { + void *d; + + assert(b && b->ref >= 1); + + if (b->ref == 1) { + memblock_unref(b); + return; + } + + d = malloc(b->length); + assert(d); + memcpy(d, b->data, b->length); + b->data = d; + b->type = MEMBLOCK_DYNAMIC; +} + diff --git a/src/memblock.h b/src/memblock.h new file mode 100644 index 00000000..48e87286 --- /dev/null +++ b/src/memblock.h @@ -0,0 +1,31 @@ +#ifndef foomemblockhfoo +#define foomemblockhfoo + +#include + +enum memblock_type { MEMBLOCK_FIXED, MEMBLOCK_APPENDED, MEMBLOCK_DYNAMIC }; + +struct memblock { + enum memblock_type type; + unsigned ref; + size_t length; + void *data; +}; + +struct memchunk { + struct memblock *memblock; + size_t index, length; +}; + +struct memblock *memblock_new(size_t length); +struct memblock *memblock_new_fixed(void *data, size_t length); +struct memblock *memblock_new_dynamic(void *data, size_t length); + +void memblock_unref(struct memblock*b); +struct memblock* memblock_ref(struct memblock*b); + +void memblock_unref_fixed(struct memblock*b); + +#define memblock_assert_exclusive(b) assert((b)->ref == 1) + +#endif diff --git a/src/memblockq.c b/src/memblockq.c new file mode 100644 index 00000000..1424c556 --- /dev/null +++ b/src/memblockq.c @@ -0,0 +1,156 @@ +#include +#include + +#include "memblockq.h" + +struct memblock_list { + struct memblock_list *next; + struct memchunk chunk; +}; + +struct memblockq { + struct memblock_list *blocks, *blocks_tail; + unsigned n_blocks; + size_t total_length; + size_t maxlength; + size_t base; +}; + +struct memblockq* memblockq_new(size_t maxlength, size_t base) { + struct memblockq* bq; + assert(maxlength && base); + + bq = malloc(sizeof(struct memblockq)); + assert(bq); + bq->blocks = bq->blocks_tail = 0; + bq->n_blocks = 0; + bq->total_length = 0; + bq->base = base; + bq->maxlength = ((maxlength+base-1)/base)*base; + assert(bq->maxlength >= base); + return bq; +} + +void memblockq_free(struct memblockq* bq) { + struct memblock_list *l; + assert(bq); + + while ((l = bq->blocks)) { + bq->blocks = l->next; + memblock_unref(l->chunk.memblock); + free(l); + } + + free(bq); +} + +void memblockq_push(struct memblockq* bq, struct memchunk *chunk, size_t delta) { + struct memblock_list *q; + assert(bq && chunk && chunk->memblock && chunk->index); + + q = malloc(sizeof(struct memblock_list)); + assert(q); + + q->chunk = *chunk; + memblock_ref(q->chunk.memblock); + assert(q->chunk.index+q->chunk.length <= q->chunk.memblock->length); + q->next = NULL; + + if (bq->blocks_tail) + bq->blocks_tail->next = q; + else + bq->blocks = q; + + bq->blocks_tail = q; + + bq->n_blocks++; + bq->total_length += chunk->length; + + memblockq_shorten(bq, bq->maxlength); +} + +int memblockq_peek(struct memblockq* bq, struct memchunk *chunk) { + assert(bq && chunk); + + if (!bq->blocks) + return -1; + + *chunk = bq->blocks->chunk; + memblock_ref(chunk->memblock); + return 0; +} + +int memblockq_pop(struct memblockq* bq, struct memchunk *chunk) { + struct memblock_list *q; + + assert(bq && chunk); + + if (!bq->blocks) + return -1; + + q = bq->blocks; + bq->blocks = bq->blocks->next; + + *chunk = q->chunk; + + bq->n_blocks--; + bq->total_length -= chunk->length; + + free(q); + return 0; +} + +void memblockq_drop(struct memblockq *bq, size_t length) { + assert(bq); + + while (length > 0) { + size_t l = length; + assert(bq->blocks && bq->total_length >= length); + + if (l > bq->blocks->chunk.length) + l = bq->blocks->chunk.length; + + bq->blocks->chunk.index += l; + bq->blocks->chunk.length -= l; + bq->total_length -= l; + + if (bq->blocks->chunk.length == 0) { + struct memblock_list *q; + + q = bq->blocks; + bq->blocks = bq->blocks->next; + memblock_unref(q->chunk.memblock); + free(q); + + bq->n_blocks--; + } + + length -= l; + } +} + +void memblockq_shorten(struct memblockq *bq, size_t length) { + size_t l; + assert(bq); + + if (bq->total_length <= length) + return; + + l = bq->total_length - length; + l /= bq->base; + l *= bq->base; + + memblockq_drop(bq, l); +} + + +void memblockq_empty(struct memblockq *bq) { + assert(bq); + memblockq_shorten(bq, 0); +} + +int memblockq_is_empty(struct memblockq *bq) { + assert(bq); + + return bq->total_length >= bq->base; +} diff --git a/src/memblockq.h b/src/memblockq.h new file mode 100644 index 00000000..75c5e59e --- /dev/null +++ b/src/memblockq.h @@ -0,0 +1,24 @@ +#ifndef foomemblockqhfoo +#define foomemblockqhfoo + +#include + +#include "memblock.h" + +struct memblockq; + +struct memblockq* memblockq_new(size_t maxlength, size_t base); +void memblockq_free(struct memblockq* bq); + +void memblockq_push(struct memblockq* bq, struct memchunk *chunk, size_t delta); + +int memblockq_pop(struct memblockq* bq, struct memchunk *chunk); +int memblockq_peek(struct memblockq* bq, struct memchunk *chunk); +void memblockq_drop(struct memblockq *bq, size_t length); + +void memblockq_shorten(struct memblockq *bq, size_t length); +void memblockq_empty(struct memblockq *bq); + +int memblockq_is_empty(struct memblockq *bq); + +#endif diff --git a/src/module.c b/src/module.c new file mode 100644 index 00000000..bcd0b6c0 --- /dev/null +++ b/src/module.c @@ -0,0 +1,98 @@ +#include +#include + +#include "module.h" + +struct module* module_load(struct core *c, const char *name, const char *argument) { + struct module *m = NULL; + + assert(c && name); + + m = malloc(sizeof(struct module)); + assert(m); + + if (!(m->dl = lt_dlopenext(name))) + goto fail; + + if (!(m->init = lt_dlsym(m->dl, "module_init"))) + goto fail; + + if (!(m->done = lt_dlsym(m->dl, "module_done"))) + goto fail; + + m->name = strdup(name); + m->argument = argument ? strdup(argument) : NULL; + m->userdata = NULL; + + assert(m->init); + if (m->init(c, m) < 0) + goto fail; + + if (!c->modules) + c->modules = idxset_new(NULL, NULL); + + assert(c->modules); + r = idxset_put(c->modules, m, &m->index); + assert(r >= 0 && m->index != IDXSET_INVALID); + return m; + +fail: + if (m) { + if (m->dl) + lt_dlclose(m->dl); + + free(m); + } + + return NULL; +} + +static void module_free(struct module *m) { + assert(m && m->done); + m->done(c, m); + + lt_dlcose(m->dl); + free(m->name); + free(m->argument); + free(m); +} + +void module_unload(struct core *c, struct module *m) { + struct module *m; + assert(c && index != IDXSET_INVALID); + + assert(c->modules); + if (!(m = idxset_remove_by_data(c->modules, m, NULL))) + return; + + module_free(m); +} + +void module_unload_by_index(struct core *c, guint32_t index) { + struct module *m; + assert(c && index != IDXSET_INVALID); + + assert(c->modules); + if (!(m = idxset_remove_by_index(c->modules, index))) + return; + + module_free(m); +} + + +void free_callback(void *p, void *userdata) { + struct module *m = p; + assert(m); + module_free(m); +} + +void module_unload_all(struct core *c) { + assert(c); + + if (!c->modules) + return; + + idxset_free(c->modules, free_callback, NULL); + c->modules = NULL; +} + diff --git a/src/module.h b/src/module.h new file mode 100644 index 00000000..d0dfa045 --- /dev/null +++ b/src/module.h @@ -0,0 +1,27 @@ +#ifndef foomodulehfoo +#define foomodulehfoo + +#include +#include + +#include "core.h" + +struct module { + char *name, *argument; + uint32_t index; + + lt_dlhandle *dl; + + int (*init)(struct core *c, struct module*m); + void (*done)(struct core *c, struct module*m); + + void *userdata; +}; + +struct module* module_load(struct core *c, const char *name, const char*argument); +void module_unload(struct core *c, struct module *m); +void module_unload_by_index(struct core *c, uint32_t index); + +void module_unload_all(struct core *c); + +#endif diff --git a/src/oss.c b/src/oss.c new file mode 100644 index 00000000..42e60360 --- /dev/null +++ b/src/oss.c @@ -0,0 +1,30 @@ +#include "module.h" + +struct userdata { + struct sink *sink; + struct source *source; + int fd; +}; + +int module_init(struct core *c, struct module*m) { + struct userdata *u; + assert(c && m); + + u = malloc(sizeof(struct userdata)); + assert(u); + memset(u, 0, sizeof(struct userdata)); + m->userdata = u; + + return 0; +} + +void module_done(struct core *c, struct module*m) { + struct userdata *u; + assert(c && m); + + u = m->userdata; + + sink_free(u->sink); + source_free(u->source); + free(u); +} diff --git a/src/outputstream.c b/src/outputstream.c new file mode 100644 index 00000000..ffec77da --- /dev/null +++ b/src/outputstream.c @@ -0,0 +1,41 @@ +#include +#include +#include + +#include "outputstream.h" + +struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name) { + struct output_stream *o; + int r; + assert(s && spec); + + o = malloc(sizeof(struct output_stream)); + assert(o); + o->name = name ? strdup(name) : NULL; + o->source = s; + o->spec = *spec; + + o->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec)); + assert(o->memblockq); + + assert(s->core); + r = idxset_put(s->core->output_streams, o, &o->index); + assert(r == 0 && o->index != IDXSET_INVALID); + r = idxset_put(s->output_streams, o, NULL); + assert(r == 0); + + return o; +} + +void output_stream_free(struct output_stream* o) { + assert(o); + + memblockq_free(o->memblockq); + + assert(o->source && o->source->core); + idxset_remove_by_data(o->source->core->output_streams, o, NULL); + idxset_remove_by_data(o->source->output_streams, o, NULL); + + free(o->name); + free(o); +} diff --git a/src/outputstream.h b/src/outputstream.h new file mode 100644 index 00000000..41054341 --- /dev/null +++ b/src/outputstream.h @@ -0,0 +1,22 @@ +#ifndef foooutputstreamhfoo +#define foooutputstreamhfoo + +#include +#include "source.h" +#include "sample.h" +#include "memblockq.h" + +struct output_stream { + char *name; + uint32_t index; + + struct source *source; + struct sample_spec spec; + + struct memblockq *memblockq; +}; + +struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name); +void output_stream_free(struct output_stream* o); + +#endif diff --git a/src/packet.c b/src/packet.c new file mode 100644 index 00000000..086e4b2a --- /dev/null +++ b/src/packet.c @@ -0,0 +1,29 @@ +#include +#include + +#include "packet.h" + +struct packet* packet_new(uint32_t length) { + struct packet *p; + assert(length); + p = malloc(sizeof(struct packet)+length); + assert(p); + + p->ref = 1; + p->length = length; + return p; +} + +struct packet* packet_ref(struct packet *p) { + assert(p && p->ref >= 1); + p->ref++; + return p; +} + +void packet_unref(struct packet *p) { + assert(p && p->ref >= 1); + p->ref--; + + if (p->ref == 0) + free(p); +} diff --git a/src/packet.h b/src/packet.h new file mode 100644 index 00000000..781c0e66 --- /dev/null +++ b/src/packet.h @@ -0,0 +1,18 @@ +#ifndef foopackethfoo +#define foopackethfoo + +#include +#include + +struct packet { + unsigned ref; + size_t length; + uint8_t data[]; +}; + +struct packet* packet_new(uint32_t length); + +struct packet* packet_ref(struct packet *p); +void packet_unref(struct packet *p); + +#endif diff --git a/src/protocol-native-tcp.c b/src/protocol-native-tcp.c new file mode 100644 index 00000000..b33f3e15 --- /dev/null +++ b/src/protocol-native-tcp.c @@ -0,0 +1,19 @@ +#include "module.h" + +int module_init(struct core *c, struct module*m) { + struct socket_server *s; + assert(c && m); + + if (!(s = socket_server_new_ipv4(c->mainloop, INADDR_LOOPBACK, 4711))) + return -1; + + m->userdata = protocol_native_new(s); + assert(m->userdata); + return 0; +} + +void module_done(struct core *c, struct module*m) { + assert(c && m); + + protocol_native_free(m->userdata); +} diff --git a/src/protocol-native-unix.c b/src/protocol-native-unix.c new file mode 100644 index 00000000..a18965cd --- /dev/null +++ b/src/protocol-native-unix.c @@ -0,0 +1,27 @@ +#include "module.h" + +int module_init(struct core *c, struct module*m) { + struct fn[PATH_MAX]; + struct socket_server *s; + char *t; + assert(c && m); + + if (!(t = getenv("TMP"))) + if (!(t = getenv("TEMP"))) + t = "/tmp"; + + snprintf(fn, sizeof(fn), "%s/foosock", t); + + if (!(s = socket_server_new_unix(c->mainloop, fn))) + return -1; + + m->userdata = protocol_native_new(s); + assert(m->userdata); + return 0; +} + +void module_done(struct core *c, struct module*m) { + assert(c && m); + + protocol_native_free(m->userdata); +} diff --git a/src/protocol-native.c b/src/protocol-native.c new file mode 100644 index 00000000..bdb69355 --- /dev/null +++ b/src/protocol-native.c @@ -0,0 +1,49 @@ +#include "protocol-native.h" + +struct protocol_native { + struct socket_server*server; + struct idxset *connection; +}; + +struct stream_info { + guint32_t tag; + + union { + struct output_stream *output_stream; + struct input_stream *input_stream; + } +}; + +struct connection { + struct client *client; + struct serializer *serializer; + + +}; + +static void on_connection(struct socket_server *server, struct iochannel *io, void *userdata) { + struct protocol_native *p = userdata; + assert(server && io && p && p->server == server); + + +} + +struct protocol_native* protocol_native(struct socket_server *server) { + struct protocol_native *p; + assert(server); + + p = malloc(sizeof(struct protocol_native)); + assert(p); + + p->server = server; + socket_server_set_callback(p->server, callback, p); + + return p; +} + +void protocol_native_free(struct protocol_native *p) { + assert(p); + + socket_server_free(p->server); + free(p); +} diff --git a/src/protocol-native.h b/src/protocol-native.h new file mode 100644 index 00000000..bdad03b4 --- /dev/null +++ b/src/protocol-native.h @@ -0,0 +1,9 @@ +#ifndef fooprotocolnativehfoo +#define fooprotocolnativehfoo + +struct protocol_native; + +struct protocol_native* protocol_native(struct socket_server *server); +void protocol_native_free(struct protocol_native *n); + +#endif diff --git a/src/protocol-simple-tcp.c b/src/protocol-simple-tcp.c new file mode 100644 index 00000000..e71d7142 --- /dev/null +++ b/src/protocol-simple-tcp.c @@ -0,0 +1,24 @@ +#include +#include + +#include "module.h" +#include "socket-server.h" +#include "protocol-simple.h" + +int module_init(struct core *c, struct module*m) { + struct socket_server *s; + assert(c && m); + + if (!(s = socket_server_new_ipv4(c->mainloop, INADDR_LOOPBACK, 4712))) + return -1; + + m->userdata = protocol_simple_new(c, s, PROTOCOL_SIMPLE_PLAYBACK); + assert(m->userdata); + return 0; +} + +void module_done(struct core *c, struct module*m) { + assert(c && m); + + protocol_simple_free(m->userdata); +} diff --git a/src/protocol-simple.c b/src/protocol-simple.c new file mode 100644 index 00000000..3335bc14 --- /dev/null +++ b/src/protocol-simple.c @@ -0,0 +1,173 @@ +#include +#include +#include +#include +#include +#include + +#include "inputstream.h" +#include "outputstream.h" +#include "protocol-simple.h" +#include "client.h" + +struct connection { + struct protocol_simple *protocol; + struct iochannel *io; + struct input_stream *istream; + struct output_stream *ostream; + struct client *client; +}; + +struct protocol_simple { + struct core *core; + struct socket_server*server; + struct idxset *connections; + enum protocol_simple_mode mode; +}; + +#define BUFSIZE PIPE_BUF + +static void free_connection(void *data, void *userdata) { + struct connection *c = data; + assert(data); + + if (c->istream) + input_stream_free(c->istream); + if (c->ostream) + output_stream_free(c->ostream); + + client_free(c->client); + + iochannel_free(c->io); + free(c); +} + +static void io_callback(struct iochannel*io, void *userdata) { + struct connection *c = userdata; + assert(io && c); + + if (c->istream && iochannel_is_readable(io)) { + struct memchunk chunk; + ssize_t r; + + chunk.memblock = memblock_new(BUFSIZE); + assert(chunk.memblock); + + if ((r = iochannel_read(io, chunk.memblock->data, BUFSIZE)) <= 0) { + fprintf(stderr, "read(): %s\n", r == 0 ? "EOF" : strerror(errno)); + memblock_unref(chunk.memblock); + goto fail; + } + + chunk.memblock->length = r; + chunk.length = r; + chunk.index = 0; + + memblockq_push(c->istream->memblockq, &chunk, 0); + input_stream_notify(c->istream); + memblock_unref(chunk.memblock); + } + + if (c->ostream && iochannel_is_writable(io)) { + struct memchunk chunk; + ssize_t r; + + memblockq_peek(c->ostream->memblockq, &chunk); + assert(chunk.memblock && chunk.length); + + if ((r = iochannel_write(io, chunk.memblock->data+chunk.index, chunk.length)) < 0) { + fprintf(stderr, "write(): %s\n", strerror(errno)); + memblock_unref(chunk.memblock); + goto fail; + } + + memblockq_drop(c->ostream->memblockq, r); + memblock_unref(chunk.memblock); + } + + return; + +fail: + idxset_remove_by_data(c->protocol->connections, c, NULL); + free_connection(c, NULL); +} + +static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) { + struct protocol_simple *p = userdata; + struct connection *c = NULL; + assert(s && io && p); + + c = malloc(sizeof(struct connection)); + assert(c); + c->io = io; + c->istream = NULL; + c->ostream = NULL; + c->protocol = p; + + if (p->mode & PROTOCOL_SIMPLE_RECORD) { + struct source *source; + + if (!(source = core_get_default_source(p->core))) { + fprintf(stderr, "Failed to get default source.\n"); + goto fail; + } + + c->ostream = output_stream_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name); + assert(c->ostream); + } + + if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) { + struct sink *sink; + + if (!(sink = core_get_default_sink(p->core))) { + fprintf(stderr, "Failed to get default sink.\n"); + goto fail; + } + + c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name); + assert(c->istream); + } + + c->client = client_new(p->core, "SIMPLE", "Client"); + assert(c->client); + + iochannel_set_callback(c->io, io_callback, c); + idxset_put(p->connections, c, NULL); + return; + +fail: + if (c) { + if (c->istream) + input_stream_free(c->istream); + if (c->ostream) + output_stream_free(c->ostream); + + iochannel_free(c->io); + free(c); + } +} + +struct protocol_simple* protocol_simple_new(struct core *core, struct socket_server *server, enum protocol_simple_mode mode) { + struct protocol_simple* p; + assert(core && server && mode <= PROTOCOL_SIMPLE_DUPLEX && mode > 0); + + p = malloc(sizeof(struct protocol_simple)); + assert(p); + p->core = core; + p->server = server; + p->connections = idxset_new(NULL, NULL); + p->mode = mode; + + socket_server_set_callback(p->server, on_connection, p); + + return p; +} + + +void protocol_simple_free(struct protocol_simple *p) { + assert(p); + + idxset_free(p->connections, free_connection, NULL); + socket_server_free(p->server); + free(p); +} diff --git a/src/protocol-simple.h b/src/protocol-simple.h new file mode 100644 index 00000000..f6210436 --- /dev/null +++ b/src/protocol-simple.h @@ -0,0 +1,17 @@ +#ifndef fooprotocolsimplehfoo +#define fooprotocolsimplehfoo + +#include "socket-server.h" + +struct protocol_simple; + +enum protocol_simple_mode { + PROTOCOL_SIMPLE_RECORD = 1, + PROTOCOL_SIMPLE_PLAYBACK = 2, + PROTOCOL_SIMPLE_DUPLEX = 3 +}; + +struct protocol_simple* protocol_simple_new(struct core *core, struct socket_server *server, enum protocol_simple_mode mode); +void protocol_simple_free(struct protocol_simple *n); + +#endif diff --git a/src/pstream.c b/src/pstream.c new file mode 100644 index 00000000..083ebc22 --- /dev/null +++ b/src/pstream.c @@ -0,0 +1,359 @@ +#include +#include + +#include "pstream.h" +#include "queue.h" + +enum pstream_descriptor_index { + PSTREAM_DESCRIPTOR_LENGTH, + PSTREAM_DESCRIPTOR_CHANNEL, + PSTREAM_DESCRIPTOR_DELTA, + PSTREAM_DESCRIPTOR_MAX +}; + +typedef uint32_t pstream_descriptor[PSTREAM_DESCRIPTOR_MAX]; + +#define PSTREAM_DESCRIPTOR_SIZE (PSTREAM_DESCRIPTOR_MAX*sizeof(uint32_t)) +#define FRAME_SIZE_MAX (1024*64) + +struct item_info { + enum { PSTREAM_ITEM_PACKET, PSTREAM_ITEM_MEMBLOCK } type; + + /* memblock info */ + struct memchunk chunk; + uint32_t channel; + int32_t delta; + + /* packet info */ + struct packet *packet; +}; + +struct pstream { + struct mainloop *mainloop; + struct mainloop_source *mainloop_source; + struct iochannel *io; + struct queue *send_queue; + + int dead; + + struct { + struct item_info* current; + pstream_descriptor descriptor; + void *data; + size_t index; + } write; + + void (*send_callback) (struct pstream *p, void *userdata); + void *send_callback_userdata; + + struct { + struct memblock *memblock; + struct packet *packet; + pstream_descriptor descriptor; + void *data; + size_t index; + } read; + + void (*recieve_packet_callback) (struct pstream *p, struct packet *packet, void *userdata); + void *recieve_packet_callback_userdata; + + void (*recieve_memblock_callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata); + void *recieve_memblock_callback_userdata; +}; + +static void do_write(struct pstream *p); +static void do_read(struct pstream *p); + +static void io_callback(struct iochannel*io, void *userdata) { + struct pstream *p = userdata; + assert(p && p->io == io); + do_write(p); + do_read(p); +} + +static void prepare_callback(struct mainloop_source *s, void*userdata) { + struct pstream *p = userdata; + assert(p && p->mainloop_source == s); + do_write(p); + do_read(p); +} + +struct pstream *pstream_new(struct mainloop *m, struct iochannel *io) { + struct pstream *p; + assert(io); + + p = malloc(sizeof(struct pstream)); + assert(p); + + p->io = io; + iochannel_set_callback(io, io_callback, p); + + p->dead = 0; + + p->mainloop = m; + p->mainloop_source = mainloop_source_new_prepare(m, prepare_callback, p); + mainloop_source_enable(p->mainloop_source, 0); + + p->send_queue = queue_new(); + assert(p->send_queue); + + p->write.current = NULL; + p->write.index = 0; + + p->read.memblock = NULL; + p->read.packet = NULL; + p->read.index = 0; + + p->send_callback = NULL; + p->send_callback_userdata = NULL; + + p->recieve_packet_callback = NULL; + p->recieve_packet_callback_userdata = NULL; + + p->recieve_memblock_callback = NULL; + p->recieve_memblock_callback_userdata = NULL; + + return p; +} + +static void item_free(void *item, void *p) { + struct item_info *i = item; + assert(i); + + if (i->type == PSTREAM_ITEM_PACKET) { + assert(i->chunk.memblock); + memblock_unref(i->chunk.memblock); + } else { + assert(i->type == PSTREAM_ITEM_MEMBLOCK); + assert(i->packet); + packet_unref(i->packet); + } + + free(i); +} + +void pstream_free(struct pstream *p) { + assert(p); + + iochannel_free(p->io); + queue_free(p->send_queue, item_free, NULL); + + if (p->write.current) + item_free(p->write.current, NULL); + + if (p->read.memblock) + memblock_unref(p->read.memblock); + + if (p->read.packet) + packet_unref(p->read.packet); + + mainloop_source_free(p->mainloop_source); + free(p); +} + +void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata) { + assert(p && callback); + + p->send_callback = callback; + p->send_callback_userdata = userdata; +} + +void pstream_send_packet(struct pstream*p, struct packet *packet) { + struct item_info *i; + assert(p && packet); + + i = malloc(sizeof(struct item_info)); + assert(i); + i->type = PSTREAM_ITEM_PACKET; + i->packet = packet; + + queue_push(p->send_queue, i); + mainloop_source_enable(p->mainloop_source, 1); +} + +void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk) { + struct item_info *i; + assert(p && channel && chunk); + + i = malloc(sizeof(struct item_info)); + assert(i); + i->type = PSTREAM_ITEM_MEMBLOCK; + i->chunk = *chunk; + i->channel = channel; + i->delta = delta; + + queue_push(p->send_queue, i); + mainloop_source_enable(p->mainloop_source, 1); +} + +void pstream_set_recieve_packet_callback(struct pstream *p, void (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata) { + assert(p && callback); + + p->recieve_packet_callback = callback; + p->recieve_packet_callback_userdata = userdata; +} + +void pstream_set_recieve_memblock_callback(struct pstream *p, void (*callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata), void *userdata) { + assert(p && callback); + + p->recieve_memblock_callback = callback; + p->recieve_memblock_callback_userdata = userdata; +} + +static void prepare_next_write_item(struct pstream *p) { + assert(p); + + if (!(p->write.current = queue_pop(p->send_queue))) + return; + + p->write.index = 0; + + if (p->write.current->type == PSTREAM_ITEM_PACKET) { + assert(p->write.current->packet); + p->write.data = p->write.current->packet->data; + p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = p->write.current->packet->length; + p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = 0; + p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = 0; + } else { + assert(p->write.current->type == PSTREAM_ITEM_MEMBLOCK && p->write.current->chunk.memblock); + p->write.data = p->write.current->chunk.memblock->data + p->write.current->chunk.index; + p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] = p->write.current->chunk.length; + p->write.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] = p->write.current->channel; + p->write.descriptor[PSTREAM_DESCRIPTOR_DELTA] = p->write.current->delta; + } +} + +static void do_write(struct pstream *p) { + void *d; + size_t l; + ssize_t r; + assert(p); + + mainloop_source_enable(p->mainloop_source, 0); + + if (p->dead || !iochannel_is_writable(p->io)) + return; + + if (!p->write.current) + prepare_next_write_item(p); + + if (!p->write.current) + return; + + assert(p->write.data); + + if (p->write.index < PSTREAM_DESCRIPTOR_SIZE) { + d = (void*) p->write.descriptor + p->write.index; + l = PSTREAM_DESCRIPTOR_SIZE - p->write.index; + } else { + d = (void*) p->write.data + p->write.index - PSTREAM_DESCRIPTOR_SIZE; + l = p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH] - p->write.index - PSTREAM_DESCRIPTOR_SIZE; + } + + if ((r = iochannel_write(p->io, d, l)) < 0) { + p->dead = 1; + return; + } + + p->write.index += r; + + if (p->write.index >= PSTREAM_DESCRIPTOR_SIZE+p->write.descriptor[PSTREAM_DESCRIPTOR_LENGTH]) { + assert(p->write.current); + item_free(p->write.current, (void *) 1); + p->write.current = NULL; + + if (p->send_callback && queue_is_empty(p->send_queue)) + p->send_callback(p, p->send_callback_userdata); + } +} + +static void do_read(struct pstream *p) { + void *d; + size_t l; + ssize_t r; + assert(p); + + mainloop_source_enable(p->mainloop_source, 0); + + if (p->dead || !iochannel_is_readable(p->io)) + return; + + if (p->read.index < PSTREAM_DESCRIPTOR_SIZE) { + d = (void*) p->read.descriptor + p->read.index; + l = PSTREAM_DESCRIPTOR_SIZE - p->read.index; + } else { + assert(p->read.data); + d = (void*) p->read.data + p->read.index - PSTREAM_DESCRIPTOR_SIZE; + l = p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] - p->read.index - PSTREAM_DESCRIPTOR_SIZE; + } + + if ((r = iochannel_read(p->io, d, l)) <= 0) { + p->dead = 1; + return; + } + + p->read.index += r; + + if (p->read.index == PSTREAM_DESCRIPTOR_SIZE) { + /* Reading of frame descriptor complete */ + + /* Frame size too large */ + if (p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] > FRAME_SIZE_MAX) { + p->dead = 1; + return; + } + + assert(!p->read.packet && !p->read.memblock); + + if (p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL] == 0) { + /* Frame is a packet frame */ + p->read.packet = packet_new(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]); + assert(p->read.packet); + p->read.data = p->read.packet->data; + } else { + /* Frame is a memblock frame */ + p->read.memblock = memblock_new(p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH]); + assert(p->read.memblock); + p->read.data = p->read.memblock->data; + } + + } else if (p->read.index > PSTREAM_DESCRIPTOR_SIZE) { + /* Frame payload available */ + + if (p->read.memblock && p->recieve_memblock_callback) { /* Is this memblockd data? Than pass it to the user */ + size_t l; + + l = p->read.index - r < PSTREAM_DESCRIPTOR_SIZE ? p->read.index - PSTREAM_DESCRIPTOR_SIZE : r; + + if (l > 0) { + struct memchunk chunk; + + chunk.memblock = p->read.memblock; + chunk.index = p->read.index - PSTREAM_DESCRIPTOR_SIZE - l; + chunk.length = l; + + p->recieve_memblock_callback(p, p->read.descriptor[PSTREAM_DESCRIPTOR_CHANNEL], (int32_t) p->read.descriptor[PSTREAM_DESCRIPTOR_DELTA], &chunk, p->recieve_memblock_callback_userdata); + } + } + + /* Frame complete */ + if (p->read.index >= p->read.descriptor[PSTREAM_DESCRIPTOR_LENGTH] + PSTREAM_DESCRIPTOR_SIZE) { + if (p->read.memblock) { + assert(!p->read.packet); + + memblock_unref(p->read.memblock); + p->read.memblock = NULL; + } else { + assert(p->read.packet); + + if (p->recieve_packet_callback) + p->recieve_packet_callback(p, p->read.packet, p->recieve_packet_callback_userdata); + + packet_unref(p->read.packet); + p->read.packet = NULL; + } + + p->read.index = 0; + } + } +} diff --git a/src/pstream.h b/src/pstream.h new file mode 100644 index 00000000..c0b57496 --- /dev/null +++ b/src/pstream.h @@ -0,0 +1,22 @@ +#ifndef foopstreamhfoo +#define foopstreamhfoo + +#include + +#include "packet.h" +#include "memblock.h" +#include "iochannel.h" + +struct pstream; + +struct pstream* pstream_new(struct mainloop *m, struct iochannel *io); +void pstream_free(struct pstream*p); + +void pstream_set_send_callback(struct pstream*p, void (*callback) (struct pstream *p, void *userdata), void *userdata); +void pstream_send_packet(struct pstream*p, struct packet *packet); +void pstream_send_memblock(struct pstream*p, uint32_t channel, int32_t delta, struct memchunk *chunk); + +void pstream_set_recieve_packet_callback(struct pstream *p, void (*callback) (struct pstream *p, struct packet *packet, void *userdata), void *userdata); +void pstream_set_recieve_memblock_callback(struct pstream *p, void (*callback) (struct pstream *p, uint32_t channel, int32_t delta, struct memchunk *chunk, void *userdata), void *userdata); + +#endif diff --git a/src/queue.c b/src/queue.c new file mode 100644 index 00000000..90823ae6 --- /dev/null +++ b/src/queue.c @@ -0,0 +1,77 @@ +#include +#include + +#include "queue.h" + +struct queue_entry { + struct queue_entry *next; + void *data; +}; + +struct queue { + struct queue_entry *front, *back; + unsigned length; +}; + +struct queue* queue_new(void) { + struct queue *q = malloc(sizeof(struct queue)); + assert(q); + q->front = q->back = NULL; + q->length = 0; + return q; +} + +void queue_free(struct queue* q, void (*destroy)(void *p, void *userdata), void *userdata) { + struct queue_entry *e; + assert(q); + + e = q->front; + while (e) { + struct queue_entry *n = e->next; + + if (destroy) + destroy(e->data, userdata); + + free(e); + e = n; + } + + free(q); +} + +void queue_push(struct queue *q, void *p) { + struct queue_entry *e; + + e = malloc(sizeof(struct queue_entry)); + + e->data = p; + e->next = NULL; + + if (q->back) + q->back->next = e; + else { + assert(!q->front); + q->front = e; + } + + q->back = e; + q->length++; +} + +void* queue_pop(struct queue *q) { + void *p; + struct queue_entry *e; + assert(q); + + if (!(e = q->front)) + return NULL; + + q->front = e->next; + if (q->back == e) + q->back = NULL; + + p = e->data; + free(e); + + return p; +} diff --git a/src/queue.h b/src/queue.h new file mode 100644 index 00000000..6b371a81 --- /dev/null +++ b/src/queue.h @@ -0,0 +1,13 @@ +#ifndef fooqueuehfoo +#define fooqueuehfoo + +struct queue; + +struct queue* queue_new(void); +void queue_free(struct queue* q, void (*destroy)(void *p, void *userdata), void *userdata); +void queue_push(struct queue *q, void *p); +void* queue_pop(struct queue *q); + +int queue_is_empty(struct queue *q); + +#endif diff --git a/src/sample.c b/src/sample.c new file mode 100644 index 00000000..74a54937 --- /dev/null +++ b/src/sample.c @@ -0,0 +1,80 @@ +#include +#include + +#include "sample.h" + +struct sample_spec default_sample_spec = { + .format = SAMPLE_S16NE, + .rate = 44100, + .channels = 2 +}; + +struct memblock *silence(struct memblock* b, struct sample_spec *spec) { + char c; + assert(b && spec); + memblock_assert_exclusive(b); + + switch (spec->format) { + case SAMPLE_U8: + c = 127; + break; + case SAMPLE_S16LE: + case SAMPLE_S16BE: + case SAMPLE_FLOAT32: + c = 0; + break; + case SAMPLE_ALAW: + case SAMPLE_ULAW: + c = 80; + break; + } + + memset(b->data, c, b->length); + return b; +} + +void add_clip(struct memchunk *target, struct memchunk *chunk, struct sample_spec *spec) { + int16_t *p, *d; + size_t i; + assert(target && target->memblock && chunk && chunk->memblock && spec); + assert(spec->format == SAMPLE_S16NE); + assert((target->length & 1) == 0); + + d = target->memblock->data + target->index; + p = chunk->memblock->data + chunk->index; + + for (i = 0; i < target->length && i < chunk->length; i++) { + int32_t r = (int32_t) *d + (int32_t) *p; + if (r < -0x8000) r = 0x8000; + if (r > 0x7FFF) r = 0x7FFF; + *d = (int16_t) r; + } +} + +size_t sample_size(struct sample_spec *spec) { + assert(spec); + size_t b; + + switch (spec->format) { + case SAMPLE_U8: + case SAMPLE_ULAW: + case SAMPLE_ALAW: + b = 1; + break; + case SAMPLE_S16LE: + case SAMPLE_S16BE: + b = 2; + break; + case SAMPLE_FLOAT32: + b = 4; + break; + } + + return b * spec->channels; +} + +size_t bytes_per_second(struct sample_spec *spec) { + assert(spec); + return spec->rate*sample_size(spec); +} + diff --git a/src/sample.h b/src/sample.h new file mode 100644 index 00000000..ecbe33f2 --- /dev/null +++ b/src/sample.h @@ -0,0 +1,35 @@ +#ifndef foosamplehfoo +#define foosamplehfoo + +#include + +#include "memblock.h" + +enum sample_format { + SAMPLE_U8, + SAMPLE_ALAW, + SAMPLE_ULAW, + SAMPLE_S16LE, + SAMPLE_S16BE, + SAMPLE_FLOAT32 +}; + +#define SAMPLE_S16NE SAMPLE_S16LE + +struct sample_spec { + enum sample_format format; + uint32_t rate; + uint32_t channels; +}; + +#define DEFAULT_SAMPLE_SPEC default_sample_spec + +extern struct sample_spec default_sample_spec; + +struct memblock *silence(struct memblock* b, struct sample_spec *spec); +void add_clip(struct memchunk *target, struct memchunk *chunk, struct sample_spec *spec); + +size_t bytes_per_second(struct sample_spec *spec); +size_t sample_size(struct sample_spec *spec); + +#endif diff --git a/src/sink-pipe.c b/src/sink-pipe.c new file mode 100644 index 00000000..4a8348f8 --- /dev/null +++ b/src/sink-pipe.c @@ -0,0 +1,155 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iochannel.h" +#include "sink.h" +#include "module.h" + +struct userdata { + struct sink *sink; + struct iochannel *io; + struct core *core; + struct mainloop_source *mainloop_source; + + struct memchunk memchunk; +}; + +static void do_write(struct userdata *u) { + ssize_t r; + assert(u); + + mainloop_source_enable(u->mainloop_source, 0); + + if (!iochannel_is_writable(u->io)) + return; + + if (!u->memchunk.length) + if (sink_render(u->sink, PIPE_BUF, &u->memchunk) < 0) + return; + + assert(u->memchunk.memblock && u->memchunk.length); + + if ((r = iochannel_write(u->io, u->memchunk.memblock->data + u->memchunk.index, u->memchunk.length)) < 0) { + fprintf(stderr, "write() failed: %s\n", strerror(errno)); + return; + } + + u->memchunk.index += r; + u->memchunk.length -= r; + + if (u->memchunk.length <= 0) { + memblock_unref(u->memchunk.memblock); + u->memchunk.memblock = NULL; + } +} + +static void notify_callback(struct sink*s, void *userdata) { + struct userdata *u = userdata; + assert(u); + + if (iochannel_is_writable(u->io)) + mainloop_source_enable(u->mainloop_source, 1); +} + +static void prepare_callback(struct mainloop_source *src, void *userdata) { + struct userdata *u = userdata; + assert(u); + do_write(u); +} + +static void io_callback(struct iochannel *io, void*userdata) { + struct userdata *u = userdata; + assert(u); + do_write(u); +} + +int module_init(struct core *c, struct module*m) { + struct userdata *u = NULL; + struct stat st; + struct sink *sink; + char *p; + int fd = -1; + const static struct sample_spec ss = { + .format = SAMPLE_S16NE, + .rate = 44100, + .channels = 2, + }; + assert(c && m); + + mkfifo((p = m->argument ? m->argument : "/tmp/musicfifo"), 0777); + + if ((fd = open(p, O_RDWR) < 0)) { + fprintf(stderr, "open('%s'): %s\n", p, strerror(errno)); + goto fail; + } + + if (fstat(fd, &st) < 0) { + fprintf(stderr, "fstat('%s'): %s\n", p, strerror(errno)); + goto fail; + } + + if (!S_ISFIFO(st.st_mode)) { + fprintf(stderr, "'%s' is not a FIFO\n", p); + goto fail; + } + + if (!(sink = sink_new(c, "fifo", &ss))) { + fprintf(stderr, "Failed to allocate new sink!\n"); + goto fail; + } + + u = malloc(sizeof(struct userdata)); + assert(u); + + u->core = c; + u->sink = sink; + sink_set_notify_callback(sink, notify_callback, u); + + u->io = iochannel_new(c->mainloop, -1, fd); + assert(u->io); + iochannel_set_callback(u->io, io_callback, u); + + u->memchunk.memblock = NULL; + u->memchunk.length = 0; + + u->mainloop_source = mainloop_source_new_prepare(c->mainloop, prepare_callback, u); + assert(u->mainloop_source); + mainloop_source_enable(u->mainloop_source, 0); + + m->userdata = u; + + + return 0; + +fail: + if (fd >= 0) + close(fd); + + if (u) + free(u); + + return -1; +} + +void module_done(struct core *c, struct module*m) { + struct userdata *u; + assert(c && m); + + u = m->userdata; + assert(u); + + if (u->memchunk.memblock) + memblock_unref(u->memchunk.memblock); + + sink_free(u->sink); + iochannel_free(u->io); + mainloop_source_free(u->mainloop_source); + free(u); +} diff --git a/src/sink.c b/src/sink.c new file mode 100644 index 00000000..ac387c78 --- /dev/null +++ b/src/sink.c @@ -0,0 +1,217 @@ +#include +#include +#include +#include + +#include "sink.h" +#include "inputstream.h" + +struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) { + struct sink *s; + char *n = NULL; + int r; + assert(core && spec); + + s = malloc(sizeof(struct sink)); + assert(s); + + s->name = name ? strdup(name) : NULL; + r = idxset_put(core->sinks, s, &s->index); + assert(s->index != IDXSET_INVALID && r >= 0); + + s->core = core; + s->sample_spec = *spec; + s->input_streams = idxset_new(NULL, NULL); + + if (name) { + n = malloc(strlen(name)+9); + sprintf(n, "%s_monitor", name); + } + + s->monitor_source = source_new(core, n, spec); + s->volume = 0xFF; + + s->notify_callback = NULL; + s->userdata = NULL; + + return s; +} + +void sink_free(struct sink *s) { + struct input_stream *i; + assert(s); + + idxset_remove_by_data(s->core->sinks, s, NULL); + source_free(s->monitor_source); + + while ((i = idxset_rrobin(s->input_streams, NULL))) + input_stream_free(i); + + free(s->name); + free(s); +} + +struct pass1_info { + size_t maxlength; + unsigned count; + struct input_stream *last_input_stream; +}; + +static int get_max_length(void *p, uint32_t index, int *del, void*userdata) { + struct memchunk chunk; + struct pass1_info *info = userdata; + struct input_stream*i = p; + assert(info && i); + + if (memblockq_peek(i->memblockq, &chunk) != 0) + return 0; + + assert(chunk.length); + + if (info->maxlength > chunk.length) + info->maxlength = chunk.length; + + info->count++; + info->last_input_stream = i; + + return 0; +} + +struct pass2_info { + struct memchunk *chunk; + struct sample_spec *spec; +}; + +static int do_mix(void *p, uint32_t index, int *del, void*userdata) { + struct memchunk chunk; + struct pass2_info *info = userdata; + struct input_stream*i = p; + assert(info && info->chunk && info->chunk->memblock && i && info->spec); + + if (memblockq_peek(i->memblockq, &chunk) != 0) + return 0; + + memblock_assert_exclusive(info->chunk->memblock); + assert(chunk.length && chunk.length <= info->chunk->memblock->length - info->chunk->index); + + add_clip(info->chunk, &chunk, info->spec); + return 0; +} + +int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *result) { + struct pass1_info pass1_info; + struct pass2_info pass2_info; + assert(s && target && result); + memblock_assert_exclusive(target); + + /* Calculate how many bytes to mix */ + pass1_info.maxlength = target->length; + pass1_info.count = 0; + + idxset_foreach(s->input_streams, get_max_length, &pass1_info); + assert(pass1_info.maxlength); + + /* No data to mix */ + if (pass1_info.count == 0) + return -1; + + /* A shortcut if only a single input stream is connected */ + if (pass1_info.count == 1) { + struct input_stream *i = pass1_info.last_input_stream; + struct memchunk chunk; + size_t l; + + assert(i); + + if (memblockq_peek(i->memblockq, &chunk) != 0) + return -1; + + l = target->length < chunk.length ? target->length : chunk.length; + memcpy(target->data, result->memblock+result->index, l); + target->length = l; + memblock_unref(chunk.memblock); + memblockq_drop(i->memblockq, l); + + result->memblock = target; + result->length = l; + result->index = 0; + return 0; + } + + /* Do the real mixing */ + result->memblock = silence(target, &s->sample_spec); + result->index = 0; + result->length = pass1_info.maxlength; + pass2_info.chunk = result; + pass2_info.spec = &s->sample_spec; + idxset_foreach(s->input_streams, do_mix, &pass2_info); + + assert(s->monitor_source); + source_post(s->monitor_source, result); + + return 0; +} + +int sink_render(struct sink*s, size_t length, struct memchunk *result) { + struct pass1_info pass1_info; + struct pass2_info pass2_info; + assert(s && result); + + if (!length) + length = (size_t) -1; + + /* Calculate how many bytes to mix */ + pass1_info.maxlength = length; + pass1_info.count = 0; + + idxset_foreach(s->input_streams, get_max_length, &pass1_info); + assert(pass1_info.maxlength); + + /* No data to mix */ + if (pass1_info.count == 0) + return -1; + + if (pass1_info.count == 1) { + struct input_stream *i = pass1_info.last_input_stream; + size_t l; + + assert(i); + + if (memblockq_peek(i->memblockq, result) != 0) + return -1; + + l = length < result->length ? length : result->length; + result->length = l; + memblockq_drop(i->memblockq, l); + return 0; + } + + /* Do the mixing */ + result->memblock = silence(memblock_new(result->length), &s->sample_spec); + result->index = 0; + result->length = pass1_info.maxlength; + pass2_info.chunk = result; + pass2_info.spec = &s->sample_spec; + idxset_foreach(s->input_streams, do_mix, &pass2_info); + + assert(s->monitor_source); + + source_post(s->monitor_source, result); + return 0; +} + +void sink_notify(struct sink*s) { + assert(s); + + if (s->notify_callback) + s->notify_callback(s, s->userdata); +} + +void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata) { + assert(s && notify_callback); + + s->notify_callback = notify_callback; + s->userdata = userdata; +} + + diff --git a/src/sink.h b/src/sink.h new file mode 100644 index 00000000..a6f98005 --- /dev/null +++ b/src/sink.h @@ -0,0 +1,38 @@ +#ifndef foosinkhfoo +#define foosinkhfoo + +struct sink; + +#include + +#include "core.h" +#include "sample.h" +#include "idxset.h" +#include "source.h" + +struct sink { + char *name; + uint32_t index; + + struct core *core; + struct sample_spec sample_spec; + struct idxset *input_streams; + + struct source *monitor_source; + + uint8_t volume; + + void (*notify_callback)(struct sink*sink, void *userdata); + void *userdata; +}; + +struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec); +void sink_free(struct sink* s); + +int sink_render(struct sink*s, size_t length, struct memchunk *result); +int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *result); + +void sink_notify(struct sink*s); +void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata); + +#endif diff --git a/src/socket-server.c b/src/socket-server.c new file mode 100644 index 00000000..2a1db9a0 --- /dev/null +++ b/src/socket-server.c @@ -0,0 +1,157 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "socket-server.h" + +struct socket_server { + int fd; + char *filename; + + void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata); + void *userdata; + + struct mainloop_source *mainloop_source; +}; + +static void callback(struct mainloop_source*src, int fd, enum mainloop_io_event event, void *userdata) { + struct socket_server *s = userdata; + struct iochannel *io; + int nfd; + assert(src && fd >= 0 && fd == s->fd && event == MAINLOOP_IO_EVENT_IN && s); + + if ((nfd = accept(fd, NULL, NULL)) < 0) { + fprintf(stderr, "accept(): %s\n", strerror(errno)); + return; + } + + if (!s->on_connection) { + close(nfd); + return; + } + + io = iochannel_new(mainloop_source_get_mainloop(src), nfd, nfd); + assert(io); + s->on_connection(s, io, s->userdata); +} + +struct socket_server* socket_server_new(struct mainloop *m, int fd) { + struct socket_server *s; + assert(m && fd >= 0); + + s = malloc(sizeof(struct socket_server)); + assert(s); + s->fd = fd; + s->filename = NULL; + s->on_connection = NULL; + s->userdata = NULL; + + s->mainloop_source = mainloop_source_new_io(m, fd, MAINLOOP_IO_EVENT_IN, callback, s); + assert(s->mainloop_source); + + return s; +} + +struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename) { + int fd = -1; + struct sockaddr_un sa; + struct socket_server *s; + + assert(m && filename); + + if ((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, "socket(): %s\n", strerror(errno)); + goto fail; + } + + sa.sun_family = AF_LOCAL; + strncpy(sa.sun_path, filename, sizeof(sa.sun_path)-1); + sa.sun_path[sizeof(sa.sun_path) - 1] = 0; + + if (bind(fd, (struct sockaddr*) &sa, SUN_LEN(&sa)) < 0) { + fprintf(stderr, "bind(): %s\n", strerror(errno)); + goto fail; + } + + if (listen(fd, 5) < 0) { + fprintf(stderr, "listen(): %s\n", strerror(errno)); + goto fail; + } + + s = socket_server_new(m, fd); + assert(s); + + s->filename = strdup(filename); + assert(s->filename); + + return s; + +fail: + if (fd >= 0) + close(fd); + + return NULL; +} + +struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port) { + int fd = -1; + struct sockaddr_in sa; + + assert(m && port); + + if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, "socket(): %s\n", strerror(errno)); + goto fail; + } + + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + sa.sin_addr.s_addr = htonl(address); + + if (bind(fd, (struct sockaddr *) &sa, sizeof(sa)) < 0) { + fprintf(stderr, "bind(): %s\n", strerror(errno)); + goto fail; + } + + if (listen(fd, 5) < 0) { + fprintf(stderr, "listen(): %s\n", strerror(errno)); + goto fail; + } + + return socket_server_new(m, fd); + +fail: + if (fd >= 0) + close(fd); + + return NULL; +} + +void socket_server_free(struct socket_server*s) { + assert(s); + close(s->fd); + + if (s->filename) { + unlink(s->filename); + free(s->filename); + } + + mainloop_source_free(s->mainloop_source); + + free(s); +} + +void socket_server_set_callback(struct socket_server*s, void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata), void *userdata) { + assert(s); + + s->on_connection = on_connection; + s->userdata = userdata; +} diff --git a/src/socket-server.h b/src/socket-server.h new file mode 100644 index 00000000..4814fc62 --- /dev/null +++ b/src/socket-server.h @@ -0,0 +1,18 @@ +#ifndef foosocketserverhfoo +#define foosocketserverhfoo + +#include +#include "mainloop.h" +#include "iochannel.h" + +struct socket_server; + +struct socket_server* socket_server_new(struct mainloop *m, int fd); +struct socket_server* socket_server_new_unix(struct mainloop *m, const char *filename); +struct socket_server* socket_server_new_ipv4(struct mainloop *m, uint32_t address, uint16_t port); + +void socket_server_free(struct socket_server*s); + +void socket_server_set_callback(struct socket_server*s, void (*on_connection)(struct socket_server*s, struct iochannel *io, void *userdata), void *userdata); + +#endif diff --git a/src/source.c b/src/source.c new file mode 100644 index 00000000..2f34c461 --- /dev/null +++ b/src/source.c @@ -0,0 +1,58 @@ +#include +#include +#include + +#include "source.h" +#include "outputstream.h" + +struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) { + struct source *s; + int r; + assert(core && spec); + + s = malloc(sizeof(struct source)); + assert(s); + + s->name = name ? strdup(name) : NULL; + r = idxset_put(core->sources, s, &s->index); + assert(s->index != IDXSET_INVALID && r >= 0); + + s->core = core; + s->sample_spec = *spec; + s->output_streams = idxset_new(NULL, NULL); + + s->link_change_callback = NULL; + s->userdata = NULL; + + return s; +} + +static void do_free(void *p, void *userdata) { + struct output_stream *o = p; + assert(o); + output_stream_free(o); +}; + +void source_free(struct source *s) { + assert(s); + + idxset_remove_by_data(s->core->sources, s, NULL); + idxset_free(s->output_streams, do_free, NULL); + free(s->name); + free(s); +} + +static int do_post(void *p, uint32_t index, int *del, void*userdata) { + struct memchunk *chunk = userdata; + struct output_stream *o = p; + assert(o && o->memblockq && index && del && chunk); + + memblockq_push(o->memblockq, chunk, 0); + return 0; +} + +void source_post(struct source*s, struct memchunk *chunk) { + assert(s && chunk); + + idxset_foreach(s->output_streams, do_post, chunk); +} diff --git a/src/source.h b/src/source.h new file mode 100644 index 00000000..3beb3f96 --- /dev/null +++ b/src/source.h @@ -0,0 +1,30 @@ +#ifndef foosourcehfoo +#define foosourcehfoo + +struct source; + +#include +#include "core.h" +#include "sample.h" +#include "idxset.h" +#include "memblock.h" + +struct source { + char *name; + uint32_t index; + + struct core *core; + struct sample_spec sample_spec; + struct idxset *output_streams; + + void (*link_change_callback)(struct source*source, void *userdata); + void *userdata; +}; + +struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec); +void source_free(struct source *s); + +/* Pass a new memory block to all output streams */ +void source_post(struct source*s, struct memchunk *b); + +#endif diff --git a/src/strbuf.c b/src/strbuf.c new file mode 100644 index 00000000..7c8b965d --- /dev/null +++ b/src/strbuf.c @@ -0,0 +1,122 @@ +#ifndef foostrbufhfoo +#define foostrbufhfoo + +#include +#include +#include +#include +#include +#include + +struct chunk { + struct chunk *next; + char text[]; +}; + +struct strbuf { + size_t length; + struct chunk *head, *tail; +}; + +struct strbuf *strbuf_new(void) { + struct strbuf *sb = malloc(sizeof(struct strbuf)); + assert(sb); + sb->length = 0; + sb->head = sb->tail = NULL; + return sb; +} + +void strbuf_free(struct strbuf *sb) { + assert(sb); + while (sb->head) { + struct chunk *c = sb->head; + sb->head = sb->head->next; + free(c); + } + + free(sb); +} + +char *strbuf_tostring(struct strbuf *sb) { + char *t, *e; + struct chunk *c; + assert(sb); + + t = malloc(sb->length+1); + assert(t); + + e = t; + *e = 0; + for (c = sb->head; c; c = c->next) { + strcpy(e, c->text); + e = strchr(e, 0); + } + + return t; +} + +void strbuf_puts(struct strbuf *sb, const char *t) { + struct chunk *c; + size_t l; + assert(sb && t); + + l = strlen(t); + c = malloc(sizeof(struct chunk)+l); + assert(c); + + c->next = NULL; + strcpy(c->text, t); + + if (sb->tail) { + assert(sb->head); + sb->tail->next = c; + } else { + assert(!sb->head); + sb->head = c; + } + + sb->tail = c; + sb->length += l; +} + +int strbuf_printf(struct strbuf *sb, const char *format, ...) { + int r, size = 100; + struct chunk *c = NULL; + + assert(sb); + + for(;;) { + va_list ap; + + c = realloc(c, sizeof(struct chunk)+size); + assert(c); + + va_start(ap, format); + r = vsnprintf(c->text, size, format, ap); + va_end(ap); + + if (r > -1 && r < size) { + c->next = NULL; + + if (sb->tail) { + assert(sb->head); + sb->tail->next = c; + } else { + assert(!sb->head); + sb->head = c; + } + + sb->tail = c; + sb->length += r; + + return r; + } + + if (r > -1) /* glibc 2.1 */ + size = r+1; + else /* glibc 2.0 */ + size *= 2; + } +} + +#endif diff --git a/src/strbuf.h b/src/strbuf.h new file mode 100644 index 00000000..6ad582a3 --- /dev/null +++ b/src/strbuf.h @@ -0,0 +1,13 @@ +#ifndef foostrbufhfoo +#define foostrbufhfoo + +struct strbuf; + +struct strbuf *strbuf_new(void); +void strbuf_free(struct strbuf *sb); +char *strbuf_tostring(struct strbuf *sb); + +int strbuf_printf(struct strbuf *sb, const char *format, ...); +void strbuf_puts(struct strbuf *sb, const char *t); + +#endif -- cgit