diff options
-rw-r--r-- | src/gstreamer.c | 327 |
1 files changed, 207 insertions, 120 deletions
diff --git a/src/gstreamer.c b/src/gstreamer.c index f008849..4173479 100644 --- a/src/gstreamer.c +++ b/src/gstreamer.c @@ -46,6 +46,7 @@ struct outstanding { CA_LLIST_FIELDS(struct outstanding); ca_bool_t dead; uint32_t id; + int err; ca_finish_callback_t callback; void *userdata; GstElement *pipeline; @@ -54,26 +55,38 @@ struct outstanding { struct private { ca_theme_data *theme; - ca_mutex *outstanding_mutex; ca_bool_t signal_semaphore; sem_t semaphore; + + GstBus *mgr_bus; + + /* Everything below protected by the outstanding_mutex */ + ca_mutex *outstanding_mutex; + ca_bool_t mgr_thread_running; ca_bool_t semaphore_allocated; CA_LLIST_HEAD(struct outstanding, outstanding); }; #define PRIVATE(c) ((struct private *) ((c)->private)) +static void* thread_func(void *userdata); +static void send_eos_msg(struct outstanding *out, int err); +static void send_mgr_exit_msg (struct private *p); + static void outstanding_free(struct outstanding *o) { GstBus *bus; ca_assert(o); - bus = gst_pipeline_get_bus(GST_PIPELINE (o->pipeline)); - gst_bus_set_sync_handler(bus, NULL, NULL); - gst_object_unref(bus); + if (o->pipeline) { + bus = gst_pipeline_get_bus(GST_PIPELINE (o->pipeline)); + if (bus != NULL) { + gst_bus_set_sync_handler(bus, NULL, NULL); + gst_object_unref(bus); + } - if (o->pipeline) gst_object_unref(GST_OBJECT(o->pipeline)); + } ca_free(o); } @@ -81,6 +94,7 @@ static void outstanding_free(struct outstanding *o) { int driver_open(ca_context *c) { GError *error = NULL; struct private *p; + pthread_t thread; ca_return_val_if_fail(c, CA_ERROR_INVALID); ca_return_val_if_fail(!PRIVATE(c), CA_ERROR_INVALID); @@ -95,6 +109,7 @@ int driver_open(ca_context *c) { if (!(p = ca_new0(struct private, 1))) return CA_ERROR_OOM; + c->private = p; if (!(p->outstanding_mutex = ca_mutex_new())) { driver_destroy(c); @@ -105,10 +120,21 @@ int driver_open(ca_context *c) { driver_destroy(c); return CA_ERROR_OOM; } - p->semaphore_allocated = TRUE; - c->private = p; + p->mgr_bus = gst_bus_new(); + if (p->mgr_bus == NULL) { + driver_destroy(c); + return CA_ERROR_OOM; + } + gst_bus_set_flushing(p->mgr_bus, FALSE); + + /* Give a reference to the bus to the mgr thread */ + if (pthread_create(&thread, NULL, thread_func, p) < 0) { + driver_destroy(c); + return CA_ERROR_OOM; + } + p->mgr_thread_running = TRUE; return CA_SUCCESS; } @@ -128,33 +154,18 @@ int driver_destroy(ca_context *c) { /* Tell all player threads to terminate */ out = p->outstanding; while (out) { - GstElement *pipeline; - - if (out->dead) { - out = out->next; - continue; - } - - pipeline = out->pipeline; - out->dead = TRUE; - - if (out->callback) - out->callback(c, out->id, CA_ERROR_DESTROYED, out->userdata); - + if (!out->dead) + send_eos_msg(out, CA_ERROR_DESTROYED); out = out->next; - - ca_mutex_unlock(p->outstanding_mutex); - - gst_element_set_state(pipeline, GST_STATE_NULL); - gst_object_unref(GST_OBJECT(pipeline)); - - ca_mutex_lock(p->outstanding_mutex); } - if (p->semaphore_allocated) { - /* Now wait until all players are destroyed */ + /* Now that we've sent EOS for all pending players, append a + * message to wait for the mgr thread to exit */ + if (p->mgr_thread_running && p->semaphore_allocated) { + send_mgr_exit_msg(p); + p->signal_semaphore = TRUE; - while (p->outstanding) { + while (p->mgr_thread_running) { ca_mutex_unlock(p->outstanding_mutex); sem_wait(&p->semaphore); ca_mutex_lock(p->outstanding_mutex); @@ -165,6 +176,9 @@ int driver_destroy(ca_context *c) { ca_mutex_free(p->outstanding_mutex); } + if (p->mgr_bus) + g_object_unref(p->mgr_bus); + if (p->theme) ca_theme_data_free(p->theme); @@ -194,7 +208,24 @@ int driver_change_props(ca_context *c, ca_proplist *changed, ca_proplist *merged return CA_SUCCESS; } -static GstBusSyncReply bus_cb(GstBus *bus, GstMessage *message, gpointer data) { +static void +send_eos_msg(struct outstanding *out, int err) { + struct private *p; + GstMessage *m; + GstStructure *s; + + out->dead = TRUE; + out->err = err; + + p = PRIVATE(out->context); + s = gst_structure_new("application/eos", "info", G_TYPE_POINTER, out, NULL); + m = gst_message_new_application (GST_OBJECT (out->pipeline), s); + + gst_bus_post (p->mgr_bus, m); +} + +static GstBusSyncReply +bus_cb(GstBus *bus, GstMessage *message, gpointer data) { int err; struct outstanding *out; struct private *p; @@ -208,52 +239,28 @@ static GstBusSyncReply bus_cb(GstBus *bus, GstMessage *message, gpointer data) { switch (GST_MESSAGE_TYPE(message)) { /* for all elements */ - case GST_MESSAGE_UNKNOWN: - return GST_BUS_DROP; case GST_MESSAGE_ERROR: err = CA_ERROR_SYSTEM; break; - /* only from bin */ case GST_MESSAGE_EOS: + /* only respect EOS from the toplevel pipeline */ if (GST_OBJECT(out->pipeline) != GST_MESSAGE_SRC(message)) - return GST_BUS_DROP; + return GST_BUS_PASS; err = CA_SUCCESS; break; - case GST_MESSAGE_STATE_CHANGED: { - GstState pending; - - if (GST_OBJECT(out->pipeline) != GST_MESSAGE_SRC(message)) - return GST_BUS_DROP; - - gst_message_parse_state_changed(message, NULL, NULL, &pending); - /* g_debug (gst_element_state_get_name (pending)); */ - - if (pending == GST_STATE_NULL || pending == GST_STATE_VOID_PENDING) - return GST_BUS_PASS; - else - return GST_BUS_DROP; - break; - } default: - return GST_BUS_DROP; + return GST_BUS_PASS; } - if (!out->dead && out->callback) - out->callback(out->context, out->id, err, out->userdata); - + /* Bin finished playback: ask the manager thread to shut it + * down, since we can't from the sync message handler */ ca_mutex_lock(p->outstanding_mutex); - - CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); - - if (!p->outstanding && p->signal_semaphore) - sem_post(&p->semaphore); - - outstanding_free(out); - + if (!out->dead) + send_eos_msg(out, err); ca_mutex_unlock(p->outstanding_mutex); - return GST_BUS_DROP; + return GST_BUS_PASS; } struct ca_sound_file { @@ -310,11 +317,87 @@ static void on_pad_added(GstElement *element, GstPad *pad, gboolean arg1, gpoint gst_caps_unref(caps); } +static void +send_mgr_exit_msg (struct private *p) { + GstMessage *m; + GstStructure *s; + + s = gst_structure_new("application/mgr-exit", NULL); + m = gst_message_new_application (NULL, s); + + gst_bus_post (p->mgr_bus, m); +} + +/* Global manager thread that shuts down GStreamer pipelines when ordered */ +static void* thread_func(void *userdata) { + struct private *p = userdata; + GstBus *bus = g_object_ref(p->mgr_bus); + + pthread_detach(pthread_self()); + + /* Pop messages from the manager bus until we see an exit command */ + do { + GstMessage *m = gst_bus_timed_pop(bus, GST_CLOCK_TIME_NONE); + const GstStructure *s; + const GValue *v; + struct outstanding *out; + + if (m == NULL) + break; + if (GST_MESSAGE_TYPE(m) != GST_MESSAGE_APPLICATION) { + gst_message_unref (m); + break; + } + + s = gst_message_get_structure(m); + if (gst_structure_has_name(s, "application/mgr-exit")) { + gst_message_unref (m); + break; + } + + /* Otherwise, this must be an EOS message for an outstanding pipe */ + ca_assert(gst_structure_has_name(s, "application/eos")); + v = gst_structure_get_value(s, "info"); + ca_assert(v); + out = g_value_get_pointer(v); + ca_assert(out); + + /* Set pipeline back to NULL to close things. By the time this + * completes, we can be sure bus_cb won't be called */ + if (gst_element_set_state(out->pipeline, GST_STATE_NULL) == + GST_STATE_CHANGE_FAILURE) { + gst_message_unref (m); + break; + } + if (out->callback) + out->callback(out->context, out->id, out->err, out->userdata); + + ca_mutex_lock(p->outstanding_mutex); + CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); + outstanding_free(out); + ca_mutex_unlock(p->outstanding_mutex); + + gst_message_unref(m); + } while (TRUE); + + /* Signal the semaphore and exit */ + ca_mutex_lock(p->outstanding_mutex); + if (p->signal_semaphore) + sem_post(&p->semaphore); + p->mgr_thread_running = FALSE; + ca_mutex_unlock(p->outstanding_mutex); + + gst_bus_set_flushing(bus, TRUE); + g_object_unref (bus); + return NULL; +} + + int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_callback_t cb, void *userdata) { struct private *p; - struct outstanding *out = NULL; + struct outstanding *out; ca_sound_file *f; - GstElement *decodebin, *sink, *audioconvert, *audioresample, *bin; + GstElement *decodebin, *sink, *audioconvert, *audioresample, *abin; GstBus *bus; GstPad *audiopad; int ret; @@ -323,10 +406,13 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal ca_return_val_if_fail(proplist, CA_ERROR_INVALID); ca_return_val_if_fail(!userdata || cb, CA_ERROR_INVALID); + out = NULL; f = NULL; sink = NULL; decodebin = NULL; - + audioconvert = NULL; + audioresample = NULL; + abin = NULL; p = PRIVATE(c); if ((ret = ca_lookup_sound_with_callback(&f, ca_gst_sound_file_open, NULL, &p->theme, c->props, proplist)) < 0) @@ -344,43 +430,55 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal || !(decodebin = gst_element_factory_make("decodebin2", NULL)) || !(audioconvert = gst_element_factory_make("audioconvert", NULL)) || !(audioresample = gst_element_factory_make("audioresample", NULL)) - || !(sink = gst_element_factory_make("autoaudiosink", NULL))) { + || !(sink = gst_element_factory_make("autoaudiosink", NULL)) + || !(abin = gst_bin_new ("audiobin"))) { + + /* At this point, if there is a failure, free each plugin separately. */ + if (out->pipeline != NULL) + g_object_unref (out->pipeline); + if (decodebin != NULL) + g_object_unref(decodebin); + if (audioconvert != NULL) + g_object_unref(audioconvert); + if (audioresample != NULL) + g_object_unref(audioresample); + if (sink != NULL) + g_object_unref(sink); + if (abin != NULL) + g_object_unref(abin); + + ca_free(out); + ret = CA_ERROR_OOM; goto fail; } - bin = gst_bin_new("audiobin"); - - g_signal_connect(decodebin, "new-decoded-pad", G_CALLBACK (on_pad_added), bin); - bus = gst_pipeline_get_bus(GST_PIPELINE (out->pipeline)); gst_bus_set_sync_handler(bus, bus_cb, out); gst_object_unref(bus); - gst_bin_add_many(GST_BIN (out->pipeline), - f->fdsrc, decodebin, NULL); - - if (!gst_element_link(f->fdsrc, decodebin)) { - f->fdsrc = NULL; - decodebin = NULL; - audioconvert = NULL; - audioresample = NULL; - sink = NULL; - goto fail; - } - - gst_bin_add_many(GST_BIN (bin), audioconvert, audioresample, sink, NULL); + g_signal_connect(decodebin, "new-decoded-pad", + G_CALLBACK (on_pad_added), abin); + gst_bin_add_many(GST_BIN (abin), audioconvert, audioresample, sink, NULL); gst_element_link_many(audioconvert, audioresample, sink, NULL); audiopad = gst_element_get_pad(audioconvert, "sink"); - gst_element_add_pad(bin, gst_ghost_pad_new("sink", audiopad)); - + gst_element_add_pad(abin, gst_ghost_pad_new("sink", audiopad)); gst_object_unref(audiopad); - gst_bin_add(GST_BIN (out->pipeline), bin); + gst_bin_add_many(GST_BIN (out->pipeline), + f->fdsrc, decodebin, abin, NULL); + if (!gst_element_link(f->fdsrc, decodebin)) { + /* Bin now owns the fdsrc... */ + f->fdsrc = NULL; + + outstanding_free(out); + ret = CA_ERROR_OOM; + goto fail; + } + /* Bin now owns the fdsrc... */ + f->fdsrc = NULL; - decodebin = NULL; - sink = NULL; ca_free(f); f = NULL; @@ -388,11 +486,8 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal CA_LLIST_PREPEND(struct outstanding, p->outstanding, out); ca_mutex_unlock(p->outstanding_mutex); - if (gst_element_set_state(out->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { - ca_mutex_lock(p->outstanding_mutex); - CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); - ca_mutex_unlock(p->outstanding_mutex); - + if (gst_element_set_state(out->pipeline, + GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { ret = CA_ERROR_NOTAVAILABLE; goto fail; } @@ -406,17 +501,6 @@ fail: if (f) ca_free(f); - if (sink) - gst_object_unref(sink); - - if (decodebin) - gst_object_unref(decodebin); - - if (out && out->pipeline) - gst_object_unref(out->pipeline); - - ca_free(out); - return ret; } @@ -431,31 +515,34 @@ int driver_cancel(ca_context *c, uint32_t id) { ca_mutex_lock(p->outstanding_mutex); - for (out = p->outstanding; out; out = out->next) { - GstElement *pipeline; + for (out = p->outstanding; out;/* out = out->next*/) { + struct outstanding *next; - if (out->id != id) + if (out->id != id || out->pipeline == NULL || out->dead == TRUE) + { + out = out->next; continue; + } - if (out->pipeline == NULL) - break; - + if (gst_element_set_state(out->pipeline, GST_STATE_NULL) == + GST_STATE_CHANGE_FAILURE) { + goto error; + } if (out->callback) out->callback(c, out->id, CA_ERROR_CANCELED, out->userdata); - - pipeline = out->pipeline; - out->dead = TRUE; - - ca_mutex_unlock(p->outstanding_mutex); - gst_element_set_state(out->pipeline, GST_STATE_NULL); - ca_mutex_lock(p->outstanding_mutex); - - gst_object_unref(pipeline); + next = out->next; + CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); + outstanding_free(out); + out = next; } ca_mutex_unlock(p->outstanding_mutex); return CA_SUCCESS; + +error: + ca_mutex_unlock(p->outstanding_mutex); + return CA_ERROR_SYSTEM; } int driver_cache(ca_context *c, ca_proplist *proplist) { |