summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/client.c14
-rw-r--r--src/client.h15
-rw-r--r--src/core.c6
-rw-r--r--src/idxset.c51
-rw-r--r--src/idxset.h7
-rw-r--r--src/inputstream.c18
-rw-r--r--src/inputstream.h17
-rw-r--r--src/memblockq.c2
-rw-r--r--src/outputstream.c16
-rw-r--r--src/outputstream.h5
-rw-r--r--src/protocol-simple.c32
-rw-r--r--src/sink.c9
-rw-r--r--src/source.c7
13 files changed, 161 insertions, 38 deletions
diff --git a/src/client.c b/src/client.c
index 56d85734..3dd37668 100644
--- a/src/client.c
+++ b/src/client.c
@@ -14,7 +14,7 @@ struct client *client_new(struct core *core, const char *protocol_name, char *na
c->protocol_name = protocol_name;
c->name = name ? strdup(name) : NULL;
c->kill = NULL;
- c->userdata = NULL;
+ c->kill_userdata = NULL;
c->core = core;
r = idxset_put(core->clients, c, &c->index);
@@ -30,3 +30,15 @@ void client_free(struct client *c) {
free(c->name);
free(c);
}
+
+void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata) {
+ assert(c && kill);
+ c->kill = kill;
+ c->kill_userdata = userdata;
+}
+
+void client_kill(struct client *c) {
+ assert(c);
+ c->kill(c, c->kill_userdata);
+}
+
diff --git a/src/client.h b/src/client.h
index 7128a452..8d9e519c 100644
--- a/src/client.h
+++ b/src/client.h
@@ -9,13 +9,24 @@ struct client {
const char *protocol_name;
- void *userdata;
- void (*kill)(struct client *c);
+ void *kill_userdata;
+ void (*kill)(struct client *c, void *userdata);
struct core *core;
};
struct client *client_new(struct core *c, const char *protocol_name, char *name);
+
+/* This function should be called only by the code that created the client */
void client_free(struct client *c);
+/* The registrant of the client should call this function to set a
+ * callback function which is called when destruction of the client is
+ * requested */
+void client_set_kill_callback(struct client *c, void (*kill)(struct client *c, void *userdata), void *userdata);
+
+/* Code that didn't create the client should call this function to
+ * request destruction of the client */
+void client_kill(struct client *c);
+
#endif
diff --git a/src/core.c b/src/core.c
index 7cfa66e3..0457f4f3 100644
--- a/src/core.c
+++ b/src/core.c
@@ -57,11 +57,10 @@ struct sink* core_get_default_sink(struct core *c) {
if ((sink = idxset_get_by_index(c->sinks, c->default_sink_index)))
return sink;
- if (!(sink = idxset_rrobin(c->sinks, NULL)))
+ if (!(sink = idxset_first(c->sinks, &c->default_sink_index)))
return NULL;
fprintf(stderr, "Default sink vanished, setting to %u\n", sink->index);
- c->default_sink_index = sink->index;
return sink;
}
@@ -72,10 +71,9 @@ struct source* core_get_default_source(struct core *c) {
if ((source = idxset_get_by_index(c->sources, c->default_source_index)))
return source;
- if (!(source = idxset_rrobin(c->sources, NULL)))
+ if (!(source = idxset_first(c->sources, &c->default_source_index)))
return NULL;
fprintf(stderr, "Default source vanished, setting to %u\n", source->index);
- c->default_source_index = source->index;
return source;
}
diff --git a/src/idxset.c b/src/idxset.c
index fe447ac6..4442190d 100644
--- a/src/idxset.c
+++ b/src/idxset.c
@@ -1,3 +1,4 @@
+#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
@@ -18,7 +19,7 @@ struct idxset {
int (*compare_func)(void *a, void *b);
unsigned hash_table_size, n_entries;
- struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail, *rrobin;
+ struct idxset_entry **hash_table, **array, *iterate_list_head, *iterate_list_tail;
uint32_t index, start_index, array_size;
};
@@ -46,7 +47,6 @@ struct idxset* idxset_new(unsigned (*hash_func) (void *p), int (*compare_func) (
s->index = 0;
s->start_index = 0;
s->n_entries = 0;
- s->rrobin = NULL;
s->iterate_list_head = s->iterate_list_tail = NULL;
@@ -86,9 +86,9 @@ static struct idxset_entry* hash_scan(struct idxset *s, struct idxset_entry* e,
static void extend_array(struct idxset *s, uint32_t index) {
uint32_t i, j, l;
struct idxset_entry** n;
- assert(index >= s->start_index );
+ assert(index >= s->start_index);
- if (index <= s->start_index + s->array_size)
+ if (index < s->start_index + s->array_size)
return;
for (i = 0; i < s->array_size; i++)
@@ -111,13 +111,12 @@ static void extend_array(struct idxset *s, uint32_t index) {
}
static struct idxset_entry** array_index(struct idxset*s, uint32_t index) {
-
if (index >= s->start_index + s->array_size)
return NULL;
if (index < s->start_index)
return NULL;
-
+
return s->array + (index - s->start_index);
}
@@ -214,8 +213,8 @@ static void remove_entry(struct idxset *s, struct idxset_entry *e) {
assert(s && e);
/* Remove from array */
- a = array_index(s, s->index);
- assert(a && *a == e);
+ a = array_index(s, e->index);
+ assert(a && *a && *a == e);
*a = NULL;
/* Remove from linked list */
@@ -238,9 +237,6 @@ static void remove_entry(struct idxset *s, struct idxset_entry *e) {
else
s->hash_table[e->hash_value] = e->hash_next;
- if (s->rrobin == e)
- s->rrobin = NULL;
-
free(e);
assert(s->n_entries >= 1);
@@ -265,7 +261,7 @@ void* idxset_remove_by_index(struct idxset*s, uint32_t index) {
void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) {
struct idxset_entry *e;
unsigned h;
-
+
assert(s->hash_func);
h = s->hash_func(data) % s->hash_table_size;
@@ -283,23 +279,36 @@ void* idxset_remove_by_data(struct idxset*s, void *data, uint32_t *index) {
}
void* idxset_rrobin(struct idxset *s, uint32_t *index) {
- assert(s);
+ struct idxset_entry **a, *e = NULL;
+ assert(s && index);
+
+ if ((a = array_index(s, *index)) && *a)
+ e = (*a)->iterate_next;
+
+ if (!e)
+ e = s->iterate_list_head;
- if (s->rrobin)
- s->rrobin = s->rrobin->iterate_next;
+ if (!e)
+ return NULL;
- if (!s->rrobin)
- s->rrobin = s->iterate_list_head;
+ if (index)
+ *index = e->index;
+
+ return e->data;
+}
+
+void* idxset_first(struct idxset *s, uint32_t *index) {
+ assert(s);
- if (!s->rrobin)
+ if (!s->iterate_list_head)
return NULL;
if (index)
- *index = s->rrobin->index;
-
- return s->rrobin->data;
+ *index = s->iterate_list_head->index;
+ return s->iterate_list_head->data;
}
+
int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata) {
struct idxset_entry *e;
assert(s && func);
diff --git a/src/idxset.h b/src/idxset.h
index f649e23e..fdcb7b54 100644
--- a/src/idxset.h
+++ b/src/idxset.h
@@ -18,8 +18,15 @@ void* idxset_get_by_data(struct idxset*s, void *p, uint32_t *index);
void* idxset_remove_by_index(struct idxset*s, uint32_t index);
void* idxset_remove_by_data(struct idxset*s, void *p, uint32_t *index);
+/* This may be used to iterate through all entries. When called with
+ an invalid index value it returns the first entry, otherwise the
+ next following. The function is best called with *index =
+ IDXSET_VALID first. */
void* idxset_rrobin(struct idxset *s, uint32_t *index);
+/* Return the oldest entry in the idxset */
+void* idxset_first(struct idxset *s, uint32_t *index);
+
int idxset_foreach(struct idxset*s, int (*func)(void *p, uint32_t index, int *del, void*userdata), void *userdata);
unsigned idxset_ncontents(struct idxset*s);
diff --git a/src/inputstream.c b/src/inputstream.c
index c7b4b4c7..81719288 100644
--- a/src/inputstream.c
+++ b/src/inputstream.c
@@ -14,6 +14,8 @@ struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec,
i->name = name ? strdup(name) : NULL;
i->sink = s;
i->spec = *spec;
+ i->kill = NULL;
+ i->kill_userdata = NULL;
i->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
assert(i->memblockq);
@@ -40,7 +42,7 @@ void input_stream_free(struct input_stream* i) {
free(i);
}
-void input_stream_notify(struct input_stream *i) {
+void input_stream_notify_sink(struct input_stream *i) {
assert(i);
if (memblockq_is_empty(i->memblockq))
@@ -48,3 +50,17 @@ void input_stream_notify(struct input_stream *i) {
sink_notify(i->sink);
}
+
+void input_stream_set_kill_callback(struct input_stream *i, void (*kill)(struct input_stream*i, void *userdata), void *userdata) {
+ assert(i && kill);
+ i->kill = kill;
+ i->kill_userdata = userdata;
+}
+
+
+void input_stream_kill(struct input_stream*i) {
+ assert(i);
+
+ if (i->kill)
+ i->kill(i, i->kill_userdata);
+}
diff --git a/src/inputstream.h b/src/inputstream.h
index 0353799e..a258c3d1 100644
--- a/src/inputstream.h
+++ b/src/inputstream.h
@@ -15,11 +15,26 @@ struct input_stream {
struct sample_spec spec;
struct memblockq *memblockq;
+
+ void (*kill)(struct input_stream* i, void *userdata);
+ void *kill_userdata;
};
struct input_stream* input_stream_new(struct sink *s, struct sample_spec *spec, const char *name);
void input_stream_free(struct input_stream* i);
-void input_stream_notify(struct input_stream *i);
+/* This function notifies the attached sink that new data is available
+ * in the memblockq */
+void input_stream_notify_sink(struct input_stream *i);
+
+
+/* The registrant of the input stream should call this function to set a
+ * callback function which is called when destruction of the input stream is
+ * requested */
+void input_stream_set_kill_callback(struct input_stream *c, void (*kill)(struct input_stream*i, void *userdata), void *userdata);
+
+/* Code that didn't create the input stream should call this function to
+ * request destruction of it */
+void input_stream_kill(struct input_stream *c);
#endif
diff --git a/src/memblockq.c b/src/memblockq.c
index a422cf09..6437dd5b 100644
--- a/src/memblockq.c
+++ b/src/memblockq.c
@@ -119,6 +119,8 @@ void memblockq_drop(struct memblockq *bq, size_t length) {
q = bq->blocks;
bq->blocks = bq->blocks->next;
+ if (bq->blocks == NULL)
+ bq->blocks_tail = NULL;
memblock_unref(q->chunk.memblock);
free(q);
diff --git a/src/outputstream.c b/src/outputstream.c
index ffec77da..c6681d29 100644
--- a/src/outputstream.c
+++ b/src/outputstream.c
@@ -14,6 +14,8 @@ struct output_stream* output_stream_new(struct source *s, struct sample_spec *sp
o->name = name ? strdup(name) : NULL;
o->source = s;
o->spec = *spec;
+ o->kill = NULL;
+ o->kill_userdata = NULL;
o->memblockq = memblockq_new(bytes_per_second(spec)*5, sample_size(spec));
assert(o->memblockq);
@@ -39,3 +41,17 @@ void output_stream_free(struct output_stream* o) {
free(o->name);
free(o);
}
+
+void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata) {
+ assert(i && kill);
+ i->kill = kill;
+ i->kill_userdata = userdata;
+}
+
+
+void output_stream_kill(struct output_stream*i) {
+ assert(i);
+
+ if (i->kill)
+ i->kill(i, i->kill_userdata);
+}
diff --git a/src/outputstream.h b/src/outputstream.h
index 41054341..c6c0a717 100644
--- a/src/outputstream.h
+++ b/src/outputstream.h
@@ -14,9 +14,14 @@ struct output_stream {
struct sample_spec spec;
struct memblockq *memblockq;
+ void (*kill)(struct output_stream* i, void *userdata);
+ void *kill_userdata;
};
struct output_stream* output_stream_new(struct source *s, struct sample_spec *spec, const char *name);
void output_stream_free(struct output_stream* o);
+void output_stream_set_kill_callback(struct output_stream *i, void (*kill)(struct output_stream*i, void *userdata), void *userdata);
+void output_stream_kill(struct output_stream*i);
+
#endif
diff --git a/src/protocol-simple.c b/src/protocol-simple.c
index e930f9ae..ec121faa 100644
--- a/src/protocol-simple.c
+++ b/src/protocol-simple.c
@@ -42,6 +42,30 @@ static void free_connection(void *data, void *userdata) {
free(c);
}
+static void destroy_connection(struct connection *c) {
+ assert(c && c->protocol);
+ idxset_remove_by_data(c->protocol->connections, c, NULL);
+ free_connection(c, NULL);
+}
+
+static void istream_kill_cb(struct input_stream *i, void *userdata) {
+ struct connection *c = userdata;
+ assert(i && c);
+ destroy_connection(c);
+}
+
+static void ostream_kill_cb(struct output_stream *o, void *userdata) {
+ struct connection *c = userdata;
+ assert(o && c);
+ destroy_connection(c);
+}
+
+static void client_kill_cb(struct client *client, void*userdata) {
+ struct connection *c= userdata;
+ assert(client && c);
+ destroy_connection(c);
+}
+
static void io_callback(struct iochannel*io, void *userdata) {
struct connection *c = userdata;
assert(io && c);
@@ -64,7 +88,7 @@ static void io_callback(struct iochannel*io, void *userdata) {
chunk.index = 0;
memblockq_push(c->istream->memblockq, &chunk, 0);
- input_stream_notify(c->istream);
+ input_stream_notify_sink(c->istream);
memblock_unref(chunk.memblock);
}
@@ -88,8 +112,7 @@ static void io_callback(struct iochannel*io, void *userdata) {
return;
fail:
- idxset_remove_by_data(c->protocol->connections, c, NULL);
- free_connection(c, NULL);
+ destroy_connection(c);
}
static void on_connection(struct socket_server*s, struct iochannel *io, void *userdata) {
@@ -106,6 +129,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->client = client_new(p->core, "SIMPLE", "Client");
assert(c->client);
+ client_set_kill_callback(c->client, client_kill_cb, c);
if (p->mode & PROTOCOL_SIMPLE_RECORD) {
struct source *source;
@@ -117,6 +141,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->ostream = output_stream_new(source, &DEFAULT_SAMPLE_SPEC, c->client->name);
assert(c->ostream);
+ output_stream_set_kill_callback(c->ostream, ostream_kill_cb, c);
}
if (p->mode & PROTOCOL_SIMPLE_PLAYBACK) {
@@ -129,6 +154,7 @@ static void on_connection(struct socket_server*s, struct iochannel *io, void *us
c->istream = input_stream_new(sink, &DEFAULT_SAMPLE_SPEC, c->client->name);
assert(c->istream);
+ input_stream_set_kill_callback(c->istream, istream_kill_cb, c);
}
diff --git a/src/sink.c b/src/sink.c
index 02ca3468..dfe1bcb9 100644
--- a/src/sink.c
+++ b/src/sink.c
@@ -40,11 +40,14 @@ struct sink* sink_new(struct core *core, const char *name, const struct sample_s
}
void sink_free(struct sink *s) {
- struct input_stream *i;
+ struct input_stream *i, *j = NULL;
assert(s);
- while ((i = idxset_rrobin(s->input_streams, NULL)))
- input_stream_free(i);
+ while ((i = idxset_first(s->input_streams, NULL))) {
+ assert(i != j);
+ input_stream_kill(i);
+ j = i;
+ }
idxset_free(s->input_streams, NULL, NULL);
idxset_remove_by_data(s->core->sinks, s, NULL);
diff --git a/src/source.c b/src/source.c
index a1e7b245..2d5e9bbd 100644
--- a/src/source.c
+++ b/src/source.c
@@ -28,11 +28,14 @@ struct source* source_new(struct core *core, const char *name, const struct samp
}
void source_free(struct source *s) {
- struct output_stream *o;
+ struct output_stream *o, *j = NULL;
assert(s);
- while ((o = idxset_rrobin(s->output_streams, NULL)))
+ while ((o = idxset_first(s->output_streams, NULL))) {
+ assert(o != j);
output_stream_free(o);
+ j = o;
+ }
idxset_free(s->output_streams, NULL, NULL);
idxset_remove_by_data(s->core->sources, s, NULL);