From 3d374e9f678133638e661cadf73d4ef7ddcfe6eb Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 11 Aug 2004 00:11:12 +0000 Subject: add subscription subsystem git-svn-id: file:///home/lennart/svn/public/pulseaudio/trunk@112 fefdeb5f-60dc-0310-8127-8f9354f1896f --- polyp/Makefile.am | 3 +- polyp/client.c | 3 + polyp/core.c | 8 ++- polyp/core.h | 4 ++ polyp/module.c | 5 ++ polyp/sink-input.c | 5 ++ polyp/sink.c | 5 ++ polyp/source-output.c | 5 ++ polyp/source.c | 5 ++ polyp/subscribe.c | 152 ++++++++++++++++++++++++++++++++++++++++++++++++++ polyp/subscribe.h | 43 ++++++++++++++ 11 files changed, 236 insertions(+), 2 deletions(-) create mode 100644 polyp/subscribe.c create mode 100644 polyp/subscribe.h diff --git a/polyp/Makefile.am b/polyp/Makefile.am index 254af28a..ce753da4 100644 --- a/polyp/Makefile.am +++ b/polyp/Makefile.am @@ -115,7 +115,8 @@ polypaudio_SOURCES = idxset.c idxset.h \ sound-file.c sound-file.h \ play-memchunk.c play-memchunk.h \ autoload.c autoload.h \ - xmalloc.c xmalloc.h + xmalloc.c xmalloc.h \ + subscribe.h subscribe.c polypaudio_CFLAGS = $(AM_CFLAGS) $(LIBSAMPLERATE_CFLAGS) $(LIBSNDFILE_CFLAGS) polypaudio_INCLUDES = $(INCLTDL) diff --git a/polyp/client.c b/polyp/client.c index 83a6264d..809f9761 100644 --- a/polyp/client.c +++ b/polyp/client.c @@ -30,6 +30,7 @@ #include "client.h" #include "xmalloc.h" +#include "subscribe.h" struct pa_client *pa_client_new(struct pa_core *core, const char *protocol_name, char *name) { struct pa_client *c; @@ -49,6 +50,7 @@ struct pa_client *pa_client_new(struct pa_core *core, const char *protocol_name, assert(c->index != PA_IDXSET_INVALID && r >= 0); fprintf(stderr, "client: created %u \"%s\"\n", c->index, c->name); + pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_NEW, c->index); return c; } @@ -58,6 +60,7 @@ void pa_client_free(struct pa_client *c) { pa_idxset_remove_by_data(c->core->clients, c, NULL); fprintf(stderr, "client: freed %u \"%s\"\n", c->index, c->name); + pa_subscription_post(c->core, PA_SUBSCRIPTION_EVENT_CLIENT|PA_SUBSCRIPTION_EVENT_REMOVE, c->index); pa_xfree(c->name); pa_xfree(c); } diff --git a/polyp/core.c b/polyp/core.c index 0444fa96..55944aad 100644 --- a/polyp/core.c +++ b/polyp/core.c @@ -36,6 +36,7 @@ #include "scache.h" #include "autoload.h" #include "xmalloc.h" +#include "subscribe.h" struct pa_core* pa_core_new(struct pa_mainloop_api *m) { struct pa_core* c; @@ -63,6 +64,10 @@ struct pa_core* pa_core_new(struct pa_mainloop_api *m) { c->auto_unload_time = 20; c->auto_unload_event = NULL; + + c->subscription_defer_event = NULL; + c->subscription_event_queue = NULL; + c->subscriptions = NULL; pa_check_for_sigpipe(); @@ -93,7 +98,8 @@ void pa_core_free(struct pa_core *c) { pa_namereg_free(c); pa_scache_free(c); pa_autoload_free(c); - + pa_subscription_free_all(c); + pa_xfree(c->default_source_name); pa_xfree(c->default_sink_name); diff --git a/polyp/core.h b/polyp/core.h index e1e48cbc..43691771 100644 --- a/polyp/core.h +++ b/polyp/core.h @@ -39,6 +39,10 @@ struct pa_core { struct pa_sample_spec default_sample_spec; int auto_unload_time; struct pa_time_event *auto_unload_event; + + struct pa_defer_event *subscription_defer_event; + struct pa_queue *subscription_event_queue; + struct pa_subscription *subscriptions; }; struct pa_core* pa_core_new(struct pa_mainloop_api *m); diff --git a/polyp/module.c b/polyp/module.c index 83bfa800..849afca4 100644 --- a/polyp/module.c +++ b/polyp/module.c @@ -32,6 +32,7 @@ #include "module.h" #include "xmalloc.h" +#include "subscribe.h" #define UNLOAD_POLL_TIME 10 @@ -92,6 +93,8 @@ struct pa_module* pa_module_load(struct pa_core *c, const char *name, const char assert(r >= 0 && m->index != PA_IDXSET_INVALID); fprintf(stderr, "module: loaded %u \"%s\" with argument \"%s\".\n", m->index, m->name, m->argument); + + pa_subscription_post(c, PA_SUBSCRIPTION_EVENT_MODULE|PA_SUBSCRIPTION_EVENT_NEW, m->index); return m; @@ -117,6 +120,8 @@ static void pa_module_free(struct pa_module *m) { fprintf(stderr, "module: unloaded %u \"%s\".\n", m->index, m->name); + pa_subscription_post(m->core, PA_SUBSCRIPTION_EVENT_MODULE|PA_SUBSCRIPTION_EVENT_REMOVE, m->index); + pa_xfree(m->name); pa_xfree(m->argument); pa_xfree(m); diff --git a/polyp/sink-input.c b/polyp/sink-input.c index 04a2f020..dd7469e0 100644 --- a/polyp/sink-input.c +++ b/polyp/sink-input.c @@ -31,6 +31,7 @@ #include "sink-input.h" #include "sample-util.h" #include "xmalloc.h" +#include "subscribe.h" #define CONVERT_BUFFER_LENGTH 4096 @@ -72,6 +73,8 @@ struct pa_sink_input* pa_sink_input_new(struct pa_sink *s, const char *name, con pa_sample_snprint(st, sizeof(st), spec); fprintf(stderr, "sink-input: created %u \"%s\" on %u with sample spec \"%s\"\n", i->index, i->name, s->index, st); + + pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_NEW, i->index); return i; } @@ -87,6 +90,8 @@ void pa_sink_input_free(struct pa_sink_input* i) { pa_memblock_unref(i->resampled_chunk.memblock); if (i->resampler) pa_resampler_free(i->resampler); + + pa_subscription_post(i->sink->core, PA_SUBSCRIPTION_EVENT_SINK_INPUT|PA_SUBSCRIPTION_EVENT_REMOVE, i->index); pa_xfree(i->name); pa_xfree(i); diff --git a/polyp/sink.c b/polyp/sink.c index 9628d8bd..7e0e15cd 100644 --- a/polyp/sink.c +++ b/polyp/sink.c @@ -34,6 +34,7 @@ #include "util.h" #include "sample-util.h" #include "xmalloc.h" +#include "subscribe.h" #define MAX_MIX_CHANNELS 32 @@ -77,6 +78,8 @@ struct pa_sink* pa_sink_new(struct pa_core *core, const char *name, int fail, co pa_sample_snprint(st, sizeof(st), spec); fprintf(stderr, "sink: created %u \"%s\" with sample spec \"%s\"\n", s->index, s->name, st); + + pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_NEW, s->index); return s; } @@ -98,6 +101,8 @@ void pa_sink_free(struct pa_sink *s) { pa_idxset_remove_by_data(s->core->sinks, s, NULL); fprintf(stderr, "sink: freed %u \"%s\"\n", s->index, s->name); + + pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SINK | PA_SUBSCRIPTION_EVENT_REMOVE, s->index); pa_xfree(s->name); pa_xfree(s->description); diff --git a/polyp/source-output.c b/polyp/source-output.c index 07901fa8..c53831c7 100644 --- a/polyp/source-output.c +++ b/polyp/source-output.c @@ -29,6 +29,7 @@ #include "source-output.h" #include "xmalloc.h" +#include "subscribe.h" struct pa_source_output* pa_source_output_new(struct pa_source *s, const char *name, const struct pa_sample_spec *spec) { struct pa_source_output *o; @@ -57,6 +58,8 @@ struct pa_source_output* pa_source_output_new(struct pa_source *s, const char *n assert(r == 0 && o->index != PA_IDXSET_INVALID); r = pa_idxset_put(s->outputs, o, NULL); assert(r == 0); + + pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_NEW, o->index); return o; } @@ -70,6 +73,8 @@ void pa_source_output_free(struct pa_source_output* o) { if (o->resampler) pa_resampler_free(o->resampler); + + pa_subscription_post(o->source->core, PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT|PA_SUBSCRIPTION_EVENT_REMOVE, o->index); pa_xfree(o->name); pa_xfree(o); diff --git a/polyp/source.c b/polyp/source.c index 65c90e9a..2c611651 100644 --- a/polyp/source.c +++ b/polyp/source.c @@ -32,6 +32,7 @@ #include "source-output.h" #include "namereg.h" #include "xmalloc.h" +#include "subscribe.h" struct pa_source* pa_source_new(struct pa_core *core, const char *name, int fail, const struct pa_sample_spec *spec) { struct pa_source *s; @@ -63,6 +64,8 @@ struct pa_source* pa_source_new(struct pa_core *core, const char *name, int fail pa_sample_snprint(st, sizeof(st), spec); fprintf(stderr, "source: created %u \"%s\" with sample spec \"%s\"\n", s->index, s->name, st); + + pa_subscription_post(core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_NEW, s->index); return s; } @@ -84,6 +87,8 @@ void pa_source_free(struct pa_source *s) { fprintf(stderr, "source: freed %u \"%s\"\n", s->index, s->name); + pa_subscription_post(s->core, PA_SUBSCRIPTION_EVENT_SOURCE | PA_SUBSCRIPTION_EVENT_REMOVE, s->index); + pa_xfree(s->name); pa_xfree(s->description); pa_xfree(s); diff --git a/polyp/subscribe.c b/polyp/subscribe.c new file mode 100644 index 00000000..541657ae --- /dev/null +++ b/polyp/subscribe.c @@ -0,0 +1,152 @@ +#include +#include + +#include "queue.h" +#include "subscribe.h" +#include "xmalloc.h" + +struct pa_subscription { + struct pa_core *core; + int dead; + void (*callback)(struct pa_core *c, enum pa_subscription_event_type t, uint32_t index, void *userdata); + void *userdata; + enum pa_subscription_mask mask; + + struct pa_subscription *prev, *next; +}; + +struct pa_subscription_event { + enum pa_subscription_event_type type; + uint32_t index; +}; + +static void sched_event(struct pa_core *c); + +struct pa_subscription* pa_subscription_new(struct pa_core *c, enum pa_subscription_mask m, void (*callback)(struct pa_core *c, enum pa_subscription_event_type t, uint32_t index, void *userdata), void *userdata) { + struct pa_subscription *s; + assert(c); + + s = pa_xmalloc(sizeof(struct pa_subscription)); + s->core = c; + s->dead = 0; + s->callback = callback; + s->userdata = userdata; + s->mask = m; + + if ((s->next = c->subscriptions)) + s->next->prev = s; + s->prev = NULL; + c->subscriptions = s; + return NULL; +} + +void pa_subscription_free(struct pa_subscription*s) { + assert(s && !s->dead); + s->dead = 1; + sched_event(s->core); +} + +static void free_item(struct pa_subscription *s) { + assert(s && s->core); + + if (s->prev) + s->prev->next = s->next; + else + s->core->subscriptions = s->next; + + if (s->next) + s->next->prev = s->prev; + + pa_xfree(s); +} + +void pa_subscription_free_all(struct pa_core *c) { + struct pa_subscription_event *e; + assert(c); + + while (c->subscriptions) + free_item(c->subscriptions); + + if (c->subscription_event_queue) { + while ((e = pa_queue_pop(c->subscription_event_queue))) + pa_xfree(e); + + pa_queue_free(c->subscription_event_queue, NULL, NULL); + c->subscription_event_queue = NULL; + } + + if (c->subscription_defer_event) { + c->mainloop->defer_free(c->subscription_defer_event); + c->subscription_defer_event = NULL; + } +} + +static void defer_cb(struct pa_mainloop_api *m, struct pa_defer_event *e, void *userdata) { + struct pa_core *c = userdata; + struct pa_subscription *s; + assert(c && c->subscription_defer_event == e && c->mainloop == m); + + c->mainloop->defer_enable(c->subscription_defer_event, 0); + + + /* Dispatch queued events */ + + if (c->subscription_event_queue) { + struct pa_subscription_event *e; + + while ((e = pa_queue_pop(c->subscription_event_queue))) { + struct pa_subscription *s; + + for (s = c->subscriptions; s; s = s->next) { + if (!s->dead && pa_subscription_match_flags(s->mask, e->type)) + s->callback(c, e->type, e->index, s->userdata); + } + + pa_xfree(e); + } + } + + /* Remove dead subscriptions */ + + s = c->subscriptions; + while (s) { + struct pa_subscription *n = s->next; + if (s->dead) + free_item(s); + s = n; + } +} + +static void sched_event(struct pa_core *c) { + assert(c); + + if (!c->subscription_defer_event) { + c->subscription_defer_event = c->mainloop->defer_new(c->mainloop, defer_cb, c); + assert(c->subscription_defer_event); + } + + c->mainloop->defer_enable(c->subscription_defer_event, 1); +} + + +void pa_subscription_post(struct pa_core *c, enum pa_subscription_event_type t, uint32_t index) { + struct pa_subscription_event *e; + assert(c); + + e = pa_xmalloc(sizeof(struct pa_subscription_event)); + e->type = t; + e->index = index; + + if (!c->subscription_event_queue) { + c->subscription_event_queue = pa_queue_new(); + assert(c->subscription_event_queue); + } + + pa_queue_push(c->subscription_event_queue, e); + sched_event(c); +} + + +int pa_subscription_match_flags(enum pa_subscription_mask m, enum pa_subscription_event_type t) { + return !!(m & (1 >> (t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK))); +} diff --git a/polyp/subscribe.h b/polyp/subscribe.h new file mode 100644 index 00000000..1561249f --- /dev/null +++ b/polyp/subscribe.h @@ -0,0 +1,43 @@ +#ifndef foosubscribehfoo +#define foosubscribehfoo + +#include "core.h" + +enum pa_subscription_mask { + PA_SUBSCRIPTION_FACILITY_SINK = 1, + PA_SUBSCRIPTION_FACILITY_SOURCE = 2, + PA_SUBSCRIPTION_FACILITY_SINK_INPUT = 4, + PA_SUBSCRIPTION_FACILITY_SOURCE_OUTPUT = 8, + PA_SUBSCRIPTION_FACILITY_MODULE = 16, + PA_SUBSCRIPTION_FACILITY_CLIENT = 32, + PA_SUBSCRIPTION_FACILITY_SAMPLE_CACHE = 64, +}; + +enum pa_subscription_event_type { + PA_SUBSCRIPTION_EVENT_SINK = 0, + PA_SUBSCRIPTION_EVENT_SOURCE = 1, + PA_SUBSCRIPTION_EVENT_SINK_INPUT = 2, + PA_SUBSCRIPTION_EVENT_SOURCE_OUTPUT = 3, + PA_SUBSCRIPTION_EVENT_MODULE = 4, + PA_SUBSCRIPTION_EVENT_CLIENT = 5, + PA_SUBSCRIPTION_EVENT_SAMPLE_CACHE = 6, + PA_SUBSCRIPTION_EVENT_FACILITY_MASK = 7, + + PA_SUBSCRIPTION_EVENT_NEW = 0, + PA_SUBSCRIPTION_EVENT_CHANGE = 16, + PA_SUBSCRIPTION_EVENT_REMOVE = 32, + PA_SUBSCRIPTION_EVENT_TYPE_MASK = 16+32, +}; + +struct pa_subscription; +struct pa_subscription_event; + +struct pa_subscription* pa_subscription_new(struct pa_core *c, enum pa_subscription_mask m, void (*callback)(struct pa_core *c, enum pa_subscription_event_type t, uint32_t index, void *userdata), void *userdata); +void pa_subscription_free(struct pa_subscription*s); +void pa_subscription_free_all(struct pa_core *c); + +void pa_subscription_post(struct pa_core *c, enum pa_subscription_event_type t, uint32_t index); + +int pa_subscription_match_flags(enum pa_subscription_mask m, enum pa_subscription_event_type e); + +#endif -- cgit