diff options
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/client.c | 16 | ||||
-rw-r--r-- | src/client.h | 15 | ||||
-rw-r--r-- | src/core.c | 12 | ||||
-rw-r--r-- | src/core.h | 2 | ||||
-rw-r--r-- | src/idxset.c | 21 | ||||
-rw-r--r-- | src/idxset.h | 2 | ||||
-rw-r--r-- | src/module-pipe-sink.c | 11 | ||||
-rw-r--r-- | src/protocol-simple.c | 165 | ||||
-rw-r--r-- | src/sample.c | 15 | ||||
-rw-r--r-- | src/sample.h | 2 | ||||
-rw-r--r-- | src/sink.c | 49 | ||||
-rw-r--r-- | src/sink.h | 20 | ||||
-rw-r--r-- | src/sinkinput.c | 62 | ||||
-rw-r--r-- | src/sinkinput.h | 40 | ||||
-rw-r--r-- | src/source.c | 35 | ||||
-rw-r--r-- | src/source.h | 8 | ||||
-rw-r--r-- | src/sourceoutput.c | 38 | ||||
-rw-r--r-- | src/sourceoutput.h | 23 |
19 files changed, 261 insertions, 277 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index f0953846..40e2039d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -26,7 +26,7 @@ pkglib_LTLIBRARIES=libprotocol-simple.la module-simple-protocol-tcp.la \ polypaudio_SOURCES = idxset.c queue.c strbuf.c mainloop.c \ memblock.c sample.c memblockq.c client.c \ - core.c main.c outputstream.c inputstream.c source.c sink.c \ + core.c main.c sourceoutput.c sinkinput.c source.c sink.c \ module.c polypaudio_INCLUDES = $(INCLTDL) polypaudio_LDADD = $(LIBLTDL) diff --git a/src/client.c b/src/client.c index 578d51c3..1b84c6bf 100644 --- a/src/client.c +++ b/src/client.c @@ -12,11 +12,12 @@ struct client *client_new(struct core *core, const char *protocol_name, char *na c = malloc(sizeof(struct client)); assert(c); - c->protocol_name = protocol_name; c->name = name ? strdup(name) : NULL; - c->kill = NULL; - c->kill_userdata = NULL; c->core = core; + c->protocol_name = protocol_name; + + c->kill = NULL; + c->userdata = NULL; r = idxset_put(core->clients, c, &c->index); assert(c->index != IDXSET_INVALID && r >= 0); @@ -35,14 +36,9 @@ void client_free(struct client *c) { free(c); } -void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata) { - assert(c && kill); - c->kill = kill; - c->kill_userdata = userdata; -} - void client_kill(struct client *c) { assert(c); - c->kill(c, c->kill_userdata); + if (c->kill) + c->kill(c); } diff --git a/src/client.h b/src/client.h index 8d9e519c..556b5fb3 100644 --- a/src/client.h +++ b/src/client.h @@ -4,15 +4,15 @@ #include "core.h" struct client { - char *name; uint32_t index; - + + char *name; + struct core *core; const char *protocol_name; - void *kill_userdata; - void (*kill)(struct client *c, void *userdata); + void (*kill)(struct client *c); - struct core *core; + void *userdata; }; struct client *client_new(struct core *c, const char *protocol_name, char *name); @@ -20,11 +20,6 @@ struct client *client_new(struct core *c, const char *protocol_name, char *name) /* This function should be called only by the code that created the client */ void client_free(struct client *c); -/* The registrant of the client should call this function to set a - * callback function which is called when destruction of the client is - * requested */ -void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata); - /* Code that didn't create the client should call this function to * request destruction of the client */ void client_kill(struct client *c); @@ -16,8 +16,8 @@ struct core* core_new(struct mainloop *m) { c->clients = idxset_new(NULL, NULL); c->sinks = idxset_new(NULL, NULL); c->sources = idxset_new(NULL, NULL); - c->output_streams = idxset_new(NULL, NULL); - c->input_streams = idxset_new(NULL, NULL); + c->source_outputs = idxset_new(NULL, NULL); + c->sink_inputs = idxset_new(NULL, NULL); c->default_source_index = c->default_sink_index = IDXSET_INVALID; @@ -41,11 +41,11 @@ void core_free(struct core *c) { assert(idxset_isempty(c->sources)); idxset_free(c->sources, NULL, NULL); - assert(idxset_isempty(c->output_streams)); - idxset_free(c->output_streams, NULL, NULL); + assert(idxset_isempty(c->source_outputs)); + idxset_free(c->source_outputs, NULL, NULL); - assert(idxset_isempty(c->input_streams)); - idxset_free(c->input_streams, NULL, NULL); + assert(idxset_isempty(c->sink_inputs)); + idxset_free(c->sink_inputs, NULL, NULL); free(c); }; @@ -7,7 +7,7 @@ struct core { struct mainloop *mainloop; - struct idxset *clients, *sinks, *sources, *output_streams, *input_streams, *modules; + struct idxset *clients, *sinks, *sources, *sink_inputs, *source_outputs, *modules; uint32_t default_source_index, default_sink_index; }; diff --git a/src/idxset.c b/src/idxset.c index 4442190d..ea609f60 100644 --- a/src/idxset.c +++ b/src/idxset.c @@ -291,9 +291,7 @@ void* idxset_rrobin(struct idxset *s, uint32_t *index) { if (!e) return NULL; - if (index) - *index = e->index; - + *index = e->index; return e->data; } @@ -308,6 +306,22 @@ void* idxset_first(struct idxset *s, uint32_t *index) { return s->iterate_list_head->data; } +void *idxset_next(struct idxset *s, uint32_t *index) { + struct idxset_entry **a, *e = NULL; + assert(s && index); + + if ((a = array_index(s, *index)) && *a) + e = (*a)->iterate_next; + + if (e) { + *index = e->index; + return e->data; + } else { + *index = IDXSET_INVALID; + return NULL; + } +} + int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata) { struct idxset_entry *e; @@ -341,3 +355,4 @@ int idxset_isempty(struct idxset *s) { assert(s); return s->n_entries == 0; } + diff --git a/src/idxset.h b/src/idxset.h index fdcb7b54..90b9d247 100644 --- a/src/idxset.h +++ b/src/idxset.h @@ -26,10 +26,12 @@ void* idxset_rrobin(struct idxset *s, uint32_t *index); /* Return the oldest entry in the idxset */ void* idxset_first(struct idxset *s, uint32_t *index); +void *idxset_next(struct idxset *s, uint32_t *index); int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata); unsigned idxset_ncontents(struct idxset*s); int idxset_isempty(struct idxset *s); + #endif diff --git a/src/module-pipe-sink.c b/src/module-pipe-sink.c index e63a7a86..c0a903e9 100644 --- a/src/module-pipe-sink.c +++ b/src/module-pipe-sink.c @@ -52,9 +52,9 @@ static void do_write(struct userdata *u) { } } -static void notify_callback(struct sink*s, void *userdata) { - struct userdata *u = userdata; - assert(u); +static void notify_cb(struct sink*s) { + struct userdata *u = s->userdata; + assert(s && u); if (iochannel_is_writable(u->io)) mainloop_source_enable(u->mainloop_source, 1); @@ -77,7 +77,7 @@ int module_init(struct core *c, struct module*m) { struct stat st; char *p; int fd = -1; - const static struct sample_spec ss = { + static const struct sample_spec ss = { .format = SAMPLE_S16NE, .rate = 44100, .channels = 2, @@ -110,7 +110,8 @@ int module_init(struct core *c, struct module*m) { u->core = c; u->sink = sink_new(c, "fifo", &ss); assert(u->sink); - sink_set_notify_callback(u->sink, notify_callback, u); + u->sink->notify = notify_cb; + u->sink->userdata = u; u->io = iochannel_new(c->mainloop, -1, fd); assert(u->io); diff --git a/src/protocol-simple.c b/src/protocol-simple.c index d34a5d02..1803936e 100644 --- a/src/protocol-simple.c +++ b/src/protocol-simple.c @@ -5,17 +5,18 @@ #include <errno.h> #include <string.h> -#include "inputstream.h" -#include "outputstream.h" +#include "sinkinput.h" +#include "sourceoutput.h" #include "protocol-simple.h" #include "client.h" struct connection { struct protocol_simple *protocol; struct iochannel *io; - struct input_stream *istream; - struct output_stream *ostream; + struct sink_input *sink_input; + struct source_output *source_output; struct client *client; + struct memblockq *input_memblockq, *output_memblockq; }; struct protocol_simple { @@ -31,14 +32,18 @@ static void free_connection(void *data, void *userdata) { struct connection *c = data; assert(data); - if (c->istream) - input_stream_free(c->istream); - if (c->ostream) - output_stream_free(c->ostream); - - client_free(c->client); - - iochannel_free(c->io); + if (c->sink_input) + sink_input_free(c->sink_input); + if (c->source_output) + source_output_free(c->source_output); + if (c->client) + client_free(c->client); + if (c->io) + iochannel_free(c->io); + if (c->input_memblockq) + memblockq_free(c->input_memblockq); + if (c->output_memblockq) + memblockq_free(c->output_memblockq); free(c); } @@ -48,24 +53,6 @@ static void destroy_connection(struct connection *c) { free_connection(c, NULL); } -static void istream_kill_cb(struct input_stream *i, void *userdata) { - struct connection *c = userdata; - assert(i && c); - destroy_connection(c); -} - -static void ostream_kill_cb(struct output_stream *o, void *userdata) { - struct connection *c = userdata; - assert(o && c); - destroy_connection(c); -} - -static void client_kill_cb(struct client *client, void*userdata) { - struct connection *c= userdata; - assert(client && c); - destroy_connection(c); -} - static int do_read(struct connection *c) { struct memchunk chunk; ssize_t r; @@ -73,7 +60,7 @@ static int do_read(struct connection *c) { if (!iochannel_is_readable(c->io)) return 0; - if (!c->istream || !memblockq_is_writable(c->istream->memblockq, BUFSIZE)) + if (!c->sink_input || !memblockq_is_writable(c->input_memblockq, BUFSIZE)) return 0; chunk.memblock = memblock_new(BUFSIZE); @@ -90,10 +77,11 @@ static int do_read(struct connection *c) { chunk.memblock->length = r; chunk.length = r; chunk.index = 0; - - memblockq_push(c->istream->memblockq, &chunk, 0); - input_stream_notify_sink(c->istream); + + assert(c->input_memblockq); + memblockq_push(c->input_memblockq, &chunk, 0); memblock_unref(chunk.memblock); + sink_notify(c->sink_input->sink); return 0; } @@ -104,10 +92,11 @@ static int do_write(struct connection *c) { if (!iochannel_is_writable(c->io)) return 0; - if (!c->ostream) + if (!c->source_output) return 0; - memblockq_peek(c->ostream->memblockq, &chunk); + assert(c->output_memblockq); + memblockq_peek(c->output_memblockq, &chunk); assert(chunk.memblock && chunk.length); if ((r = iochannel_write(c->io, chunk.memblock->data+chunk.index, chunk.length)) < 0) { @@ -116,11 +105,63 @@ static int do_write(struct connection *c) { return -1; } - memblockq_drop(c->ostream->memblockq, r); + memblockq_drop(c->output_memblockq, r); memblock_unref(chunk.memblock); return 0; } +/*** sink_input callbacks ***/ + +static int sink_input_peek_cb(struct sink_input *i, struct memchunk *chunk, uint8_t *volume) { + struct connection*c = i->userdata; + assert(i && c && chunk && volume); + + if (memblockq_peek(c->input_memblockq, chunk) < 0) + return -1; + + *volume = 0xFF; + return 0; +} + +static void sink_input_drop_cb(struct sink_input *i, size_t length) { + struct connection*c = i->userdata; + assert(i && c && length); + + memblockq_drop(c->input_memblockq, length); + + if (do_read(c) < 0) + destroy_connection(c); +} + +static void sink_input_kill_cb(struct sink_input *i) { + assert(i && i->userdata); + destroy_connection((struct connection *) i->userdata); +} + +/*** source_output callbacks ***/ + +static void source_output_push_cb(struct source_output *o, struct memchunk *chunk) { + struct connection *c = o->userdata; + assert(o && c && chunk); + + memblockq_push(c->output_memblockq, chunk, 0); +} + +static void source_output_kill_cb(struct source_output *o) { + assert(o && o->userdata); + destroy_connection((struct connection *) o->userdata); +} + + +/*** client callbacks ***/ + +static void client_kill_cb(struct client *c) { + assert(c && c->userdata); + destroy_connection((struct connection *) c->userdata); +} + +/*** iochannel callbacks ***/ + static void io_callback(struct iochannel*io, void *userdata) { struct connection *c = userdata; assert(io && c && c->io == io); @@ -129,13 +170,7 @@ static void io_callback(struct iochannel*io, void *userdata) { destroy_connection(c); } -static void istream_notify_cb(struct input_stream *i, void *userdata) { - struct connection*c = userdata; - assert(i && c && c->istream == i); - - if (do_read(c) < 0) - destroy_connection(c); -} +/*** socket_server callbacks */ static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) { struct protocol_simple *p = userdata; @@ -145,39 +180,53 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c = malloc(sizeof(struct connection)); assert(c); c->io = io; - c->istream = NULL; - c->ostream = NULL; + c->sink_input = NULL; + c->source_output = NULL; + c->input_memblockq = c->output_memblockq = NULL; c->protocol = p; c->client = client_new(p->core, "SIMPLE", "Client"); assert(c->client); - client_set_kill_callback(c->client, client_kill_cb, c); + c->client->kill = client_kill_cb; + c->client->userdata = c; if (p->mode & PROTOCOL_SIMPLE_RECORD) { struct source *source; + size_t l; if (!(source = core_get_default_source(p->core))) { fprintf(stderr, "Failed to get default source.\n"); goto fail; } - c->ostream = output_stream_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name); - assert(c->ostream); - output_stream_set_kill_callback(c->ostream, ostream_kill_cb, c); + c->source_output = source_output_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name); + assert(c->source_output); + c->source_output->push = source_output_push_cb; + c->source_output->kill = source_output_kill_cb; + c->source_output->userdata = c; + + l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); + c->output_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2); } if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) { struct sink *sink; + size_t l; if (!(sink = core_get_default_sink(p->core))) { fprintf(stderr, "Failed to get default sink.\n"); goto fail; } - c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name); - assert(c->istream); - input_stream_set_kill_callback(c->istream, istream_kill_cb, c); - input_stream_set_notify_callback(c->istream, istream_notify_cb, c); + c->sink_input = sink_input_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name); + assert(c->sink_input); + c->sink_input->peek = sink_input_peek_cb; + c->sink_input->drop = sink_input_drop_cb; + c->sink_input->kill = sink_input_kill_cb; + c->sink_input->userdata = c; + + l = 5*bytes_per_second(&DEFAULT_SAMPLE_SPEC); + c->input_memblockq = memblockq_new(l, sample_size(&DEFAULT_SAMPLE_SPEC), l/2); } @@ -187,13 +236,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us fail: if (c) { - if (c->client) - client_free(c->client); - if (c->istream) - input_stream_free(c->istream); - if (c->ostream) - output_stream_free(c->ostream); - + free_connection(c, NULL); iochannel_free(c->io); free(c); } diff --git a/src/sample.c b/src/sample.c index 6a000228..21c04628 100644 --- a/src/sample.c +++ b/src/sample.c @@ -60,9 +60,9 @@ size_t bytes_per_second(struct sample_spec *spec) { return spec->rate*sample_size(spec); } -size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec) { +size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume) { unsigned c, d; - assert(chunks && target && spec); + assert(channels && data && length && spec); assert(spec->format == SAMPLE_S16NE); for (d = 0;; d += sizeof(int16_t)) { @@ -81,7 +81,7 @@ size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, si if (volume == 0) v = 0; else { - v = *((int16_t*) (channels[c].chunk->memblock->data + channels[c].chunk->index + d)); + v = *((int16_t*) (channels[c].chunk.memblock->data + channels[c].chunk.index + d)); if (volume != 0xFF) v = v*volume/0xFF; @@ -90,8 +90,15 @@ size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, si sum += v; } + if (volume == 0) + sum = 0; + else if (volume != 0xFF) + sum = sum*volume/0xFF; + if (sum < -0x8000) sum = -0x8000; if (sum > 0x7FFF) sum = 0x7FFF; - *(data++) = sum; + + *((int16_t*) data) = sum; + data += sizeof(int16_t); } } diff --git a/src/sample.h b/src/sample.h index 651788ba..f8ba6698 100644 --- a/src/sample.h +++ b/src/sample.h @@ -35,7 +35,7 @@ struct mix_info { void *userdata; }; -size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec) { +size_t mix_chunks(struct mix_info channels[], unsigned nchannels, void *data, size_t length, struct sample_spec *spec, uint8_t volume); size_t bytes_per_second(struct sample_spec *spec); size_t sample_size(struct sample_spec *spec); @@ -4,7 +4,9 @@ #include <stdio.h> #include "sink.h" -#include "inputstream.h" +#include "sinkinput.h" + +#define MAX_MIX_CHANNELS 32 struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec) { struct sink *s; @@ -16,9 +18,6 @@ struct sink* sink_new(struct core *core, const char *name, const struct sample_s assert(s); s->name = name ? strdup(name) : NULL; - r = idxset_put(core->sinks, s, &s->index); - assert(s->index != IDXSET_INVALID && r >= 0); - s->core = core; s->sample_spec = *spec; s->inputs = idxset_new(NULL, NULL); @@ -34,8 +33,11 @@ struct sink* sink_new(struct core *core, const char *name, const struct sample_s s->volume = 0xFF; s->notify = NULL; - s->notify_userdata = NULL; + s->userdata = NULL; + r = idxset_put(core->sinks, s, &s->index); + assert(s->index != IDXSET_INVALID && r >= 0); + fprintf(stderr, "sink: created %u \"%s\".\n", s->index, s->name); return s; @@ -46,15 +48,14 @@ void sink_free(struct sink *s) { assert(s); while ((i = idxset_first(s->inputs, NULL))) { - assert(i != j && i->kill); - i->kill(i); + assert(i != j); + sink_input_kill(i); j = i; } - idxset_free(s->inputs, NULL, NULL); - - idxset_remove_by_data(s->core->sinks, s, NULL); + source_free(s->monitor_source); + idxset_remove_by_data(s->core->sinks, s, NULL); fprintf(stderr, "sink: freed %u \"%s\"\n", s->index, s->name); @@ -66,24 +67,17 @@ void sink_notify(struct sink*s) { assert(s); if (s->notify) - s->notify(s, s->notify_userdata); -} - -void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata) { - assert(s && notify_callback); - - s->notify = notify_callback; - s->notify_userdata = userdata; + s->notify(s); } static unsigned fill_mix_info(struct sink *s, struct mix_info *info, unsigned maxinfo) { - uint32_t index = IDXSET_ANY; + uint32_t index = IDXSET_INVALID; struct sink_input *i; - unsigned n; + unsigned n = 0; assert(s && info); - while (maxinfo > 0 && i = idxset_rrobin(s->inputs, &index)) { + for (i = idxset_first(s->inputs, &index); maxinfo > 0 && i; i = idxset_next(s->inputs, &index)) { assert(i->peek); if (i->peek(i, &info->chunk, &info->volume) < 0) continue; @@ -125,7 +119,7 @@ int sink_render(struct sink*s, size_t length, struct memchunk *result) { if (n == 1) { struct sink_info *i = info[0].userdata; - assert(i && b); + assert(i); *result = info[0].chunk; memblock_ref(result->memblock); @@ -137,7 +131,7 @@ int sink_render(struct sink*s, size_t length, struct memchunk *result) { result->memblock = memblock_new(length); assert(result->memblock); - result->length = l = mix_chunks(info, n, result->memblock->data, length, &s->sample_spec); + result->length = l = mix_chunks(info, n, result->memblock->data, length, &s->sample_spec, s->volume); result->index = 0; assert(l); @@ -160,7 +154,7 @@ int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *re if (n == 1) { struct sink_info *i = info[0].userdata; - assert(i && b); + assert(i); l = target->length; if (l > info[0].chunk.length) @@ -170,15 +164,10 @@ int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *re memcpy(target->data, info[0].chunk.memblock->data + info[0].chunk.index, l); result->length = target->length = l; result->index = 0; - - if (result->length > length) - result->length = length; - - l = result->length; } else { result->memblock = target; - result->length = l = mix_chunks(info, n, target->data, target->length, &s->sample_spec); + result->length = l = mix_chunks(info, n, target->data, target->length, &s->sample_spec, s->volume); result->index = 0; assert(l); } @@ -10,21 +10,10 @@ struct sink; #include "idxset.h" #include "source.h" - -struct sink_input { - int (*peek) (struct sink_input *i, struct memchunk *chunk, uint8_t *volume); - void (*drop) (struct sink_input *i, size_t length); - void (*kill) (struct sink_input *i); - - void *userdata; - int index; - struct sink *sink; -}; - struct sink { - char *name; uint32_t index; - + + char *name; struct core *core; struct sample_spec sample_spec; struct idxset *inputs; @@ -33,8 +22,8 @@ struct sink { uint8_t volume; - void (*notify)(struct sink*sink, void *userdata); - void *notify_userdata; + void (*notify)(struct sink*sink); + void *userdata; }; struct sink* sink_new(struct core *core, const char *name, const struct sample_spec *spec); @@ -44,6 +33,5 @@ int sink_render(struct sink*s, size_t length, struct memchunk *result); int sink_render_into(struct sink*s, struct memblock *target, struct memchunk *result); void sink_notify(struct sink*s); -void sink_set_notify_callback(struct sink *s, void (*notify_callback)(struct sink*sink, void *userdata), void *userdata); #endif diff --git a/src/sinkinput.c b/src/sinkinput.c index 6bc841ac..ae7b27ff 100644 --- a/src/sinkinput.c +++ b/src/sinkinput.c @@ -2,81 +2,47 @@ #include <stdlib.h> #include <string.h> -#include "inputstream.h" +#include "sinkinput.h" -struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name) { - struct input_stream *i; +struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name) { + struct sink_input *i; int r; assert(s && spec); - i = malloc(sizeof(struct input_stream)); + i = malloc(sizeof(struct sink_input)); assert(i); i->name = name ? strdup(name) : NULL; i->sink = s; i->spec = *spec; + i->peek = NULL; + i->drop = NULL; i->kill = NULL; - i->kill_userdata = NULL; - i->notify = NULL; - i->notify_userdata = NULL; + i->userdata = NULL; - i->memblockq = memblockq_new(bytes_per_second(spec)/2, sample_size(spec), (size_t) -1); - assert(i->memblockq); - assert(s->core); - r = idxset_put(s->core->input_streams, i, &i->index); + r = idxset_put(s->core->sink_inputs, i, &i->index); assert(r == 0 && i->index != IDXSET_INVALID); - r = idxset_put(s->input_streams, i, NULL); + r = idxset_put(s->inputs, i, NULL); assert(r == 0); return i; } -void input_stream_free(struct input_stream* i) { +void sink_input_free(struct sink_input* i) { assert(i); - memblockq_free(i->memblockq); - assert(i->sink && i->sink->core); - idxset_remove_by_data(i->sink->core->input_streams, i, NULL); - idxset_remove_by_data(i->sink->input_streams, i, NULL); + idxset_remove_by_data(i->sink->core->sink_inputs, i, NULL); + idxset_remove_by_data(i->sink->inputs, i, NULL); free(i->name); free(i); } -void input_stream_notify_sink(struct input_stream *i) { - assert(i); - - if (!memblockq_is_readable(i->memblockq)) - return; - - sink_notify(i->sink); -} - -void input_stream_set_kill_callback(struct input_stream *i, void (*kill)(struct input_stream*i, void *userdata), void *userdata) { - assert(i && kill); - i->kill = kill; - i->kill_userdata = userdata; -} - - -void input_stream_kill(struct input_stream*i) { +void sink_input_kill(struct sink_input*i) { assert(i); if (i->kill) - i->kill(i, i->kill_userdata); -} - -void input_stream_set_notify_callback(struct input_stream *i, void (*notify)(struct input_stream*i, void *userdata), void *userdata) { - assert(i && notify); - - i->notify = notify; - i->notify_userdata = userdata; -} - -void input_stream_notify(struct input_stream *i) { - assert(i); - if (i->notify) - i->notify(i, i->notify_userdata); + i->kill(i); } diff --git a/src/sinkinput.h b/src/sinkinput.h index 544c3318..5e71d210 100644 --- a/src/sinkinput.h +++ b/src/sinkinput.h @@ -1,5 +1,5 @@ -#ifndef fooinputstreamhfoo -#define fooinputstreamhfoo +#ifndef foosinkinputhfoo +#define foosinkinputhfoo #include <inttypes.h> @@ -7,44 +7,26 @@ #include "sample.h" #include "memblockq.h" -struct input_stream { - char *name; +struct sink_input { uint32_t index; + char *name; struct sink *sink; struct sample_spec spec; - struct memblockq *memblockq; + int (*peek) (struct sink_input *i, struct memchunk *chunk, uint8_t *volume); + void (*drop) (struct sink_input *i, size_t length); + void (*kill) (struct sink_input *i); - void (*kill)(struct input_stream* i, void *userdata); - void *kill_userdata; - - void (*notify)(struct input_stream*i, void *userdata); - void *notify_userdata; + void *userdata; }; -struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name); -void input_stream_free(struct input_stream* i); - -/* This function notifies the attached sink that new data is available - * in the memblockq */ -void input_stream_notify_sink(struct input_stream *i); - - -/* The registrant of the input stream should call this function to set a - * callback function which is called when destruction of the input stream is - * requested */ -void input_stream_set_kill_callback(struct input_stream *i, void (*kill)(struct input_stream*i, void *userdata), void *userdata); +struct sink_input* sink_input_new(struct sink *s, struct sample_spec *spec, const char *name); +void sink_input_free(struct sink_input* i); /* Code that didn't create the input stream should call this function to * request destruction of it */ -void input_stream_kill(struct input_stream *i); - -/* Notify the code that created this input stream that some data has - * been removed from the memblockq */ -void input_stream_set_notify_callback(struct input_stream *i, void (*notify)(struct input_stream*i, void *userdata), void *userdata); - -void input_stream_notify(struct input_stream *i); +void sink_input_kill(struct sink_input *i); #endif diff --git a/src/source.c b/src/source.c index a7fc9a64..e01e9f83 100644 --- a/src/source.c +++ b/src/source.c @@ -4,7 +4,7 @@ #include <string.h> #include "source.h" -#include "outputstream.h" +#include "sourceoutput.h" struct source* source_new(struct core *core, const char *name, const struct sample_spec *spec) { struct source *s; @@ -15,31 +15,31 @@ struct source* source_new(struct core *core, const char *name, const struct samp assert(s); s->name = name ? strdup(name) : NULL; - r = idxset_put(core->sources, s, &s->index); - assert(s->index != IDXSET_INVALID && r >= 0); - s->core = core; s->sample_spec = *spec; - s->output_streams = idxset_new(NULL, NULL); + s->outputs = idxset_new(NULL, NULL); - s->link_change_callback = NULL; + s->notify = NULL; s->userdata = NULL; + r = idxset_put(core->sources, s, &s->index); + assert(s->index != IDXSET_INVALID && r >= 0); + fprintf(stderr, "source: created %u \"%s\"\n", s->index, s->name); return s; } void source_free(struct source *s) { - struct output_stream *o, *j = NULL; + struct source_output *o, *j = NULL; assert(s); - while ((o = idxset_first(s->output_streams, NULL))) { + while ((o = idxset_first(s->outputs, NULL))) { assert(o != j); - output_stream_free(o); + source_output_kill(o); j = o; } - idxset_free(s->output_streams, NULL, NULL); + idxset_free(s->outputs, NULL, NULL); idxset_remove_by_data(s->core->sources, s, NULL); @@ -49,17 +49,24 @@ void source_free(struct source *s) { free(s); } +void source_notify(struct source*s) { + assert(s); + + if (s->notify) + s->notify(s); +} + static int do_post(void *p, uint32_t index, int *del, void*userdata) { struct memchunk *chunk = userdata; - struct output_stream *o = p; - assert(o && o->memblockq && index && del && chunk); + struct source_output *o = p; + assert(o && o->push && index && del && chunk); - memblockq_push(o->memblockq, chunk, 0); + o->push(o, chunk); return 0; } void source_post(struct source*s, struct memchunk *chunk) { assert(s && chunk); - idxset_foreach(s->output_streams, do_post, chunk); + idxset_foreach(s->outputs, do_post, chunk); } diff --git a/src/source.h b/src/source.h index 3beb3f96..1442b9f0 100644 --- a/src/source.h +++ b/src/source.h @@ -10,14 +10,14 @@ struct source; #include "memblock.h" struct source { - char *name; uint32_t index; + char *name; struct core *core; struct sample_spec sample_spec; - struct idxset *output_streams; + struct idxset *outputs; - void (*link_change_callback)(struct source*source, void *userdata); + void (*notify)(struct source*source); void *userdata; }; @@ -27,4 +27,6 @@ void source_free(struct source *s); /* Pass a new memory block to all output streams */ void source_post(struct source*s, struct memchunk *b); +void source_notify(struct source *s); + #endif diff --git a/src/sourceoutput.c b/src/sourceoutput.c index c3f68a0e..d4e7a50f 100644 --- a/src/sourceoutput.c +++ b/src/sourceoutput.c @@ -2,56 +2,46 @@ #include <stdlib.h> #include <string.h> -#include "outputstream.h" +#include "sourceoutput.h" -struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name) { - struct output_stream *o; +struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name) { + struct source_output *o; int r; assert(s && spec); - o = malloc(sizeof(struct output_stream)); + o = malloc(sizeof(struct source_output)); assert(o); o->name = name ? strdup(name) : NULL; o->source = s; o->spec = *spec; - o->kill = NULL; - o->kill_userdata = NULL; - o->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec), (size_t) -1); - assert(o->memblockq); + o->push = NULL; + o->kill = NULL; + o->userdata = NULL; assert(s->core); - r = idxset_put(s->core->output_streams, o, &o->index); + r = idxset_put(s->core->source_outputs, o, &o->index); assert(r == 0 && o->index != IDXSET_INVALID); - r = idxset_put(s->output_streams, o, NULL); + r = idxset_put(s->outputs, o, NULL); assert(r == 0); return o; } -void output_stream_free(struct output_stream* o) { +void source_output_free(struct source_output* o) { assert(o); - memblockq_free(o->memblockq); - assert(o->source && o->source->core); - idxset_remove_by_data(o->source->core->output_streams, o, NULL); - idxset_remove_by_data(o->source->output_streams, o, NULL); + idxset_remove_by_data(o->source->core->source_outputs, o, NULL); + idxset_remove_by_data(o->source->outputs, o, NULL); free(o->name); free(o); } -void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata) { - assert(i && kill); - i->kill = kill; - i->kill_userdata = userdata; -} - - -void output_stream_kill(struct output_stream*i) { +void source_output_kill(struct source_output*i) { assert(i); if (i->kill) - i->kill(i, i->kill_userdata); + i->kill(i); } diff --git a/src/sourceoutput.h b/src/sourceoutput.h index c6c0a717..fecfea34 100644 --- a/src/sourceoutput.h +++ b/src/sourceoutput.h @@ -1,27 +1,28 @@ -#ifndef foooutputstreamhfoo -#define foooutputstreamhfoo +#ifndef foosourceoutputhfoo +#define foosourceoutputhfoo #include <inttypes.h> + #include "source.h" #include "sample.h" #include "memblockq.h" -struct output_stream { - char *name; +struct source_output { uint32_t index; + char *name; struct source *source; struct sample_spec spec; - struct memblockq *memblockq; - void (*kill)(struct output_stream* i, void *userdata); - void *kill_userdata; + void (*push)(struct source_output *o, struct memchunk *chunk); + void (*kill)(struct source_output* o); + + void *userdata; }; -struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name); -void output_stream_free(struct output_stream* o); +struct source_output* source_output_new(struct source *s, struct sample_spec *spec, const char *name); +void source_output_free(struct source_output* o); -void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata); -void output_stream_kill(struct output_stream*i); +void source_output_kill(struct source_output*o); #endif |