diff options
-rw-r--r-- | src/client.c | 14 | ||||
-rw-r--r-- | src/client.h | 15 | ||||
-rw-r--r-- | src/core.c | 6 | ||||
-rw-r--r-- | src/idxset.c | 51 | ||||
-rw-r--r-- | src/idxset.h | 7 | ||||
-rw-r--r-- | src/inputstream.c | 18 | ||||
-rw-r--r-- | src/inputstream.h | 17 | ||||
-rw-r--r-- | src/memblockq.c | 2 | ||||
-rw-r--r-- | src/outputstream.c | 16 | ||||
-rw-r--r-- | src/outputstream.h | 5 | ||||
-rw-r--r-- | src/protocol-simple.c | 32 | ||||
-rw-r--r-- | src/sink.c | 9 | ||||
-rw-r--r-- | src/source.c | 7 |
13 files changed, 161 insertions, 38 deletions
diff --git a/src/client.c b/src/client.c index 56d85734..3dd37668 100644 --- a/src/client.c +++ b/src/client.c @@ -14,7 +14,7 @@ struct client *client_new(struct core *core, const char *protocol_name, char *na c->protocol_name = protocol_name; c->name = name ? strdup(name) : NULL; c->kill = NULL; - c->userdata = NULL; + c->kill_userdata = NULL; c->core = core; r = idxset_put(core->clients, c, &c->index); @@ -30,3 +30,15 @@ void client_free(struct client *c) { free(c->name); free(c); } + +void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata) { + assert(c && kill); + c->kill = kill; + c->kill_userdata = userdata; +} + +void client_kill(struct client *c) { + assert(c); + c->kill(c, c->kill_userdata); +} + diff --git a/src/client.h b/src/client.h index 7128a452..8d9e519c 100644 --- a/src/client.h +++ b/src/client.h @@ -9,13 +9,24 @@ struct client { const char *protocol_name; - void *userdata; - void (*kill)(struct client *c); + void *kill_userdata; + void (*kill)(struct client *c, void *userdata); struct core *core; }; struct client *client_new(struct core *c, const char *protocol_name, char *name); + +/* This function should be called only by the code that created the client */ void client_free(struct client *c); +/* The registrant of the client should call this function to set a + * callback function which is called when destruction of the client is + * requested */ +void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata); + +/* Code that didn't create the client should call this function to + * request destruction of the client */ +void client_kill(struct client *c); + #endif @@ -57,11 +57,10 @@ struct sink* core_get_default_sink(struct core *c) { if ((sink = idxset_get_by_index(c->sinks, c->default_sink_index))) return sink; - if (!(sink = idxset_rrobin(c->sinks, NULL))) + if (!(sink = idxset_first(c->sinks, &c->default_sink_index))) return NULL; fprintf(stderr, "Default sink vanished, setting to %u\n", sink->index); - c->default_sink_index = sink->index; return sink; } @@ -72,10 +71,9 @@ struct source* core_get_default_source(struct core *c) { if ((source = idxset_get_by_index(c->sources, c->default_source_index))) return source; - if (!(source = idxset_rrobin(c->sources, NULL))) + if (!(source = idxset_first(c->sources, &c->default_source_index))) return NULL; fprintf(stderr, "Default source vanished, setting to %u\n", source->index); - c->default_source_index = source->index; return source; } diff --git a/src/idxset.c b/src/idxset.c index fe447ac6..4442190d 100644 --- a/src/idxset.c +++ b/src/idxset.c @@ -1,3 +1,4 @@ +#include <stdio.h> #include <assert.h> #include <stdlib.h> #include <string.h> @@ -18,7 +19,7 @@ struct idxset { int (*compare_func)(void *a, void *b); unsigned hash_table_size, n_entries; - struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail, *rrobin; + struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail; uint32_t index, start_index, array_size; }; @@ -46,7 +47,6 @@ struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) ( s->index = 0; s->start_index = 0; s->n_entries = 0; - s->rrobin = NULL; s->iterate_list_head = s->iterate_list_tail = NULL; @@ -86,9 +86,9 @@ static struct idxset_entry* hash_scan(struct idxset *s, struct idxset_entry* e, static void extend_array(struct idxset *s, uint32_t index) { uint32_t i, j, l; struct idxset_entry** n; - assert(index >= s->start_index ); + assert(index >= s->start_index); - if (index <= s->start_index + s->array_size) + if (index < s->start_index + s->array_size) return; for (i = 0; i < s->array_size; i++) @@ -111,13 +111,12 @@ static void extend_array(struct idxset *s, uint32_t index) { } static struct idxset_entry** array_index(struct idxset*s, uint32_t index) { - if (index >= s->start_index + s->array_size) return NULL; if (index < s->start_index) return NULL; - + return s->array + (index - s->start_index); } @@ -214,8 +213,8 @@ static void remove_entry(struct idxset *s, struct idxset_entry *e) { assert(s && e); /* Remove from array */ - a = array_index(s, s->index); - assert(a && *a == e); + a = array_index(s, e->index); + assert(a && *a && *a == e); *a = NULL; /* Remove from linked list */ @@ -238,9 +237,6 @@ static void remove_entry(struct idxset *s, struct idxset_entry *e) { else s->hash_table[e->hash_value] = e->hash_next; - if (s->rrobin == e) - s->rrobin = NULL; - free(e); assert(s->n_entries >= 1); @@ -265,7 +261,7 @@ void* idxset_remove_by_index(struct idxset*s, uint32_t index) { void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) { struct idxset_entry *e; unsigned h; - + assert(s->hash_func); h = s->hash_func(data) % s->hash_table_size; @@ -283,23 +279,36 @@ void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) { } void* idxset_rrobin(struct idxset *s, uint32_t *index) { - assert(s); + struct idxset_entry **a, *e = NULL; + assert(s && index); + + if ((a = array_index(s, *index)) && *a) + e = (*a)->iterate_next; + + if (!e) + e = s->iterate_list_head; - if (s->rrobin) - s->rrobin = s->rrobin->iterate_next; + if (!e) + return NULL; - if (!s->rrobin) - s->rrobin = s->iterate_list_head; + if (index) + *index = e->index; + + return e->data; +} + +void* idxset_first(struct idxset *s, uint32_t *index) { + assert(s); - if (!s->rrobin) + if (!s->iterate_list_head) return NULL; if (index) - *index = s->rrobin->index; - - return s->rrobin->data; + *index = s->iterate_list_head->index; + return s->iterate_list_head->data; } + int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata) { struct idxset_entry *e; assert(s && func); diff --git a/src/idxset.h b/src/idxset.h index f649e23e..fdcb7b54 100644 --- a/src/idxset.h +++ b/src/idxset.h @@ -18,8 +18,15 @@ void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index); void* idxset_remove_by_index(struct idxset*s, uint32_t index); void* idxset_remove_by_data(struct idxset*s, void *p, uint32_t *index); +/* This may be used to iterate through all entries. When called with + an invalid index value it returns the first entry, otherwise the + next following. The function is best called with *index = + IDXSET_VALID first. */ void* idxset_rrobin(struct idxset *s, uint32_t *index); +/* Return the oldest entry in the idxset */ +void* idxset_first(struct idxset *s, uint32_t *index); + int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata); unsigned idxset_ncontents(struct idxset*s); diff --git a/src/inputstream.c b/src/inputstream.c index c7b4b4c7..81719288 100644 --- a/src/inputstream.c +++ b/src/inputstream.c @@ -14,6 +14,8 @@ struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, i->name = name ? strdup(name) : NULL; i->sink = s; i->spec = *spec; + i->kill = NULL; + i->kill_userdata = NULL; i->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec)); assert(i->memblockq); @@ -40,7 +42,7 @@ void input_stream_free(struct input_stream* i) { free(i); } -void input_stream_notify(struct input_stream *i) { +void input_stream_notify_sink(struct input_stream *i) { assert(i); if (memblockq_is_empty(i->memblockq)) @@ -48,3 +50,17 @@ void input_stream_notify(struct input_stream *i) { sink_notify(i->sink); } + +void input_stream_set_kill_callback(struct input_stream *i, void (*kill)(struct input_stream*i, void *userdata), void *userdata) { + assert(i && kill); + i->kill = kill; + i->kill_userdata = userdata; +} + + +void input_stream_kill(struct input_stream*i) { + assert(i); + + if (i->kill) + i->kill(i, i->kill_userdata); +} diff --git a/src/inputstream.h b/src/inputstream.h index 0353799e..a258c3d1 100644 --- a/src/inputstream.h +++ b/src/inputstream.h @@ -15,11 +15,26 @@ struct input_stream { struct sample_spec spec; struct memblockq *memblockq; + + void (*kill)(struct input_stream* i, void *userdata); + void *kill_userdata; }; struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name); void input_stream_free(struct input_stream* i); -void input_stream_notify(struct input_stream *i); +/* This function notifies the attached sink that new data is available + * in the memblockq */ +void input_stream_notify_sink(struct input_stream *i); + + +/* The registrant of the input stream should call this function to set a + * callback function which is called when destruction of the input stream is + * requested */ +void input_stream_set_kill_callback(struct input_stream *c, void (*kill)(struct input_stream*i, void *userdata), void *userdata); + +/* Code that didn't create the input stream should call this function to + * request destruction of it */ +void input_stream_kill(struct input_stream *c); #endif diff --git a/src/memblockq.c b/src/memblockq.c index a422cf09..6437dd5b 100644 --- a/src/memblockq.c +++ b/src/memblockq.c @@ -119,6 +119,8 @@ void memblockq_drop(struct memblockq *bq, size_t length) { q = bq->blocks; bq->blocks = bq->blocks->next; + if (bq->blocks == NULL) + bq->blocks_tail = NULL; memblock_unref(q->chunk.memblock); free(q); diff --git a/src/outputstream.c b/src/outputstream.c index ffec77da..c6681d29 100644 --- a/src/outputstream.c +++ b/src/outputstream.c @@ -14,6 +14,8 @@ struct output_stream* output_stream_new(struct source *s, struct sample_spec *sp o->name = name ? strdup(name) : NULL; o->source = s; o->spec = *spec; + o->kill = NULL; + o->kill_userdata = NULL; o->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec)); assert(o->memblockq); @@ -39,3 +41,17 @@ void output_stream_free(struct output_stream* o) { free(o->name); free(o); } + +void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata) { + assert(i && kill); + i->kill = kill; + i->kill_userdata = userdata; +} + + +void output_stream_kill(struct output_stream*i) { + assert(i); + + if (i->kill) + i->kill(i, i->kill_userdata); +} diff --git a/src/outputstream.h b/src/outputstream.h index 41054341..c6c0a717 100644 --- a/src/outputstream.h +++ b/src/outputstream.h @@ -14,9 +14,14 @@ struct output_stream { struct sample_spec spec; struct memblockq *memblockq; + void (*kill)(struct output_stream* i, void *userdata); + void *kill_userdata; }; struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name); void output_stream_free(struct output_stream* o); +void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata); +void output_stream_kill(struct output_stream*i); + #endif diff --git a/src/protocol-simple.c b/src/protocol-simple.c index e930f9ae..ec121faa 100644 --- a/src/protocol-simple.c +++ b/src/protocol-simple.c @@ -42,6 +42,30 @@ static void free_connection(void *data, void *userdata) { free(c); } +static void destroy_connection(struct connection *c) { + assert(c && c->protocol); + idxset_remove_by_data(c->protocol->connections, c, NULL); + free_connection(c, NULL); +} + +static void istream_kill_cb(struct input_stream *i, void *userdata) { + struct connection *c = userdata; + assert(i && c); + destroy_connection(c); +} + +static void ostream_kill_cb(struct output_stream *o, void *userdata) { + struct connection *c = userdata; + assert(o && c); + destroy_connection(c); +} + +static void client_kill_cb(struct client *client, void*userdata) { + struct connection *c= userdata; + assert(client && c); + destroy_connection(c); +} + static void io_callback(struct iochannel*io, void *userdata) { struct connection *c = userdata; assert(io && c); @@ -64,7 +88,7 @@ static void io_callback(struct iochannel*io, void *userdata) { chunk.index = 0; memblockq_push(c->istream->memblockq, &chunk, 0); - input_stream_notify(c->istream); + input_stream_notify_sink(c->istream); memblock_unref(chunk.memblock); } @@ -88,8 +112,7 @@ static void io_callback(struct iochannel*io, void *userdata) { return; fail: - idxset_remove_by_data(c->protocol->connections, c, NULL); - free_connection(c, NULL); + destroy_connection(c); } static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) { @@ -106,6 +129,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c->client = client_new(p->core, "SIMPLE", "Client"); assert(c->client); + client_set_kill_callback(c->client, client_kill_cb, c); if (p->mode & PROTOCOL_SIMPLE_RECORD) { struct source *source; @@ -117,6 +141,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c->ostream = output_stream_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name); assert(c->ostream); + output_stream_set_kill_callback(c->ostream, ostream_kill_cb, c); } if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) { @@ -129,6 +154,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name); assert(c->istream); + input_stream_set_kill_callback(c->istream, istream_kill_cb, c); } @@ -40,11 +40,14 @@ struct sink* sink_new(struct core *core, const char *name, const struct sample_s } void sink_free(struct sink *s) { - struct input_stream *i; + struct input_stream *i, *j = NULL; assert(s); - while ((i = idxset_rrobin(s->input_streams, NULL))) - input_stream_free(i); + while ((i = idxset_first(s->input_streams, NULL))) { + assert(i != j); + input_stream_kill(i); + j = i; + } idxset_free(s->input_streams, NULL, NULL); idxset_remove_by_data(s->core->sinks, s, NULL); diff --git a/src/source.c b/src/source.c index a1e7b245..2d5e9bbd 100644 --- a/src/source.c +++ b/src/source.c @@ -28,11 +28,14 @@ struct source* source_new(struct core *core, const char *name, const struct samp } void source_free(struct source *s) { - struct output_stream *o; + struct output_stream *o, *j = NULL; assert(s); - while ((o = idxset_rrobin(s->output_streams, NULL))) + while ((o = idxset_first(s->output_streams, NULL))) { + assert(o != j); output_stream_free(o); + j = o; + } idxset_free(s->output_streams, NULL, NULL); idxset_remove_by_data(s->core->sources, s, NULL); |