summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/gstreamer.c327
1 files changed, 207 insertions, 120 deletions
diff --git a/src/gstreamer.c b/src/gstreamer.c
index f008849..4173479 100644
--- a/src/gstreamer.c
+++ b/src/gstreamer.c
@@ -46,6 +46,7 @@ struct outstanding {
CA_LLIST_FIELDS(struct outstanding);
ca_bool_t dead;
uint32_t id;
+ int err;
ca_finish_callback_t callback;
void *userdata;
GstElement *pipeline;
@@ -54,26 +55,38 @@ struct outstanding {
struct private {
ca_theme_data *theme;
- ca_mutex *outstanding_mutex;
ca_bool_t signal_semaphore;
sem_t semaphore;
+
+ GstBus *mgr_bus;
+
+ /* Everything below protected by the outstanding_mutex */
+ ca_mutex *outstanding_mutex;
+ ca_bool_t mgr_thread_running;
ca_bool_t semaphore_allocated;
CA_LLIST_HEAD(struct outstanding, outstanding);
};
#define PRIVATE(c) ((struct private *) ((c)->private))
+static void* thread_func(void *userdata);
+static void send_eos_msg(struct outstanding *out, int err);
+static void send_mgr_exit_msg (struct private *p);
+
static void outstanding_free(struct outstanding *o) {
GstBus *bus;
ca_assert(o);
- bus = gst_pipeline_get_bus(GST_PIPELINE (o->pipeline));
- gst_bus_set_sync_handler(bus, NULL, NULL);
- gst_object_unref(bus);
+ if (o->pipeline) {
+ bus = gst_pipeline_get_bus(GST_PIPELINE (o->pipeline));
+ if (bus != NULL) {
+ gst_bus_set_sync_handler(bus, NULL, NULL);
+ gst_object_unref(bus);
+ }
- if (o->pipeline)
gst_object_unref(GST_OBJECT(o->pipeline));
+ }
ca_free(o);
}
@@ -81,6 +94,7 @@ static void outstanding_free(struct outstanding *o) {
int driver_open(ca_context *c) {
GError *error = NULL;
struct private *p;
+ pthread_t thread;
ca_return_val_if_fail(c, CA_ERROR_INVALID);
ca_return_val_if_fail(!PRIVATE(c), CA_ERROR_INVALID);
@@ -95,6 +109,7 @@ int driver_open(ca_context *c) {
if (!(p = ca_new0(struct private, 1)))
return CA_ERROR_OOM;
+ c->private = p;
if (!(p->outstanding_mutex = ca_mutex_new())) {
driver_destroy(c);
@@ -105,10 +120,21 @@ int driver_open(ca_context *c) {
driver_destroy(c);
return CA_ERROR_OOM;
}
-
p->semaphore_allocated = TRUE;
- c->private = p;
+ p->mgr_bus = gst_bus_new();
+ if (p->mgr_bus == NULL) {
+ driver_destroy(c);
+ return CA_ERROR_OOM;
+ }
+ gst_bus_set_flushing(p->mgr_bus, FALSE);
+
+ /* Give a reference to the bus to the mgr thread */
+ if (pthread_create(&thread, NULL, thread_func, p) < 0) {
+ driver_destroy(c);
+ return CA_ERROR_OOM;
+ }
+ p->mgr_thread_running = TRUE;
return CA_SUCCESS;
}
@@ -128,33 +154,18 @@ int driver_destroy(ca_context *c) {
/* Tell all player threads to terminate */
out = p->outstanding;
while (out) {
- GstElement *pipeline;
-
- if (out->dead) {
- out = out->next;
- continue;
- }
-
- pipeline = out->pipeline;
- out->dead = TRUE;
-
- if (out->callback)
- out->callback(c, out->id, CA_ERROR_DESTROYED, out->userdata);
-
+ if (!out->dead)
+ send_eos_msg(out, CA_ERROR_DESTROYED);
out = out->next;
-
- ca_mutex_unlock(p->outstanding_mutex);
-
- gst_element_set_state(pipeline, GST_STATE_NULL);
- gst_object_unref(GST_OBJECT(pipeline));
-
- ca_mutex_lock(p->outstanding_mutex);
}
- if (p->semaphore_allocated) {
- /* Now wait until all players are destroyed */
+ /* Now that we've sent EOS for all pending players, append a
+ * message to wait for the mgr thread to exit */
+ if (p->mgr_thread_running && p->semaphore_allocated) {
+ send_mgr_exit_msg(p);
+
p->signal_semaphore = TRUE;
- while (p->outstanding) {
+ while (p->mgr_thread_running) {
ca_mutex_unlock(p->outstanding_mutex);
sem_wait(&p->semaphore);
ca_mutex_lock(p->outstanding_mutex);
@@ -165,6 +176,9 @@ int driver_destroy(ca_context *c) {
ca_mutex_free(p->outstanding_mutex);
}
+ if (p->mgr_bus)
+ g_object_unref(p->mgr_bus);
+
if (p->theme)
ca_theme_data_free(p->theme);
@@ -194,7 +208,24 @@ int driver_change_props(ca_context *c, ca_proplist *changed, ca_proplist *merged
return CA_SUCCESS;
}
-static GstBusSyncReply bus_cb(GstBus *bus, GstMessage *message, gpointer data) {
+static void
+send_eos_msg(struct outstanding *out, int err) {
+ struct private *p;
+ GstMessage *m;
+ GstStructure *s;
+
+ out->dead = TRUE;
+ out->err = err;
+
+ p = PRIVATE(out->context);
+ s = gst_structure_new("application/eos", "info", G_TYPE_POINTER, out, NULL);
+ m = gst_message_new_application (GST_OBJECT (out->pipeline), s);
+
+ gst_bus_post (p->mgr_bus, m);
+}
+
+static GstBusSyncReply
+bus_cb(GstBus *bus, GstMessage *message, gpointer data) {
int err;
struct outstanding *out;
struct private *p;
@@ -208,52 +239,28 @@ static GstBusSyncReply bus_cb(GstBus *bus, GstMessage *message, gpointer data) {
switch (GST_MESSAGE_TYPE(message)) {
/* for all elements */
- case GST_MESSAGE_UNKNOWN:
- return GST_BUS_DROP;
case GST_MESSAGE_ERROR:
err = CA_ERROR_SYSTEM;
break;
- /* only from bin */
case GST_MESSAGE_EOS:
+ /* only respect EOS from the toplevel pipeline */
if (GST_OBJECT(out->pipeline) != GST_MESSAGE_SRC(message))
- return GST_BUS_DROP;
+ return GST_BUS_PASS;
err = CA_SUCCESS;
break;
- case GST_MESSAGE_STATE_CHANGED: {
- GstState pending;
-
- if (GST_OBJECT(out->pipeline) != GST_MESSAGE_SRC(message))
- return GST_BUS_DROP;
-
- gst_message_parse_state_changed(message, NULL, NULL, &pending);
- /* g_debug (gst_element_state_get_name (pending)); */
-
- if (pending == GST_STATE_NULL || pending == GST_STATE_VOID_PENDING)
- return GST_BUS_PASS;
- else
- return GST_BUS_DROP;
- break;
- }
default:
- return GST_BUS_DROP;
+ return GST_BUS_PASS;
}
- if (!out->dead && out->callback)
- out->callback(out->context, out->id, err, out->userdata);
-
+ /* Bin finished playback: ask the manager thread to shut it
+ * down, since we can't from the sync message handler */
ca_mutex_lock(p->outstanding_mutex);
-
- CA_LLIST_REMOVE(struct outstanding, p->outstanding, out);
-
- if (!p->outstanding && p->signal_semaphore)
- sem_post(&p->semaphore);
-
- outstanding_free(out);
-
+ if (!out->dead)
+ send_eos_msg(out, err);
ca_mutex_unlock(p->outstanding_mutex);
- return GST_BUS_DROP;
+ return GST_BUS_PASS;
}
struct ca_sound_file {
@@ -310,11 +317,87 @@ static void on_pad_added(GstElement *element, GstPad *pad, gboolean arg1, gpoint
gst_caps_unref(caps);
}
+static void
+send_mgr_exit_msg (struct private *p) {
+ GstMessage *m;
+ GstStructure *s;
+
+ s = gst_structure_new("application/mgr-exit", NULL);
+ m = gst_message_new_application (NULL, s);
+
+ gst_bus_post (p->mgr_bus, m);
+}
+
+/* Global manager thread that shuts down GStreamer pipelines when ordered */
+static void* thread_func(void *userdata) {
+ struct private *p = userdata;
+ GstBus *bus = g_object_ref(p->mgr_bus);
+
+ pthread_detach(pthread_self());
+
+ /* Pop messages from the manager bus until we see an exit command */
+ do {
+ GstMessage *m = gst_bus_timed_pop(bus, GST_CLOCK_TIME_NONE);
+ const GstStructure *s;
+ const GValue *v;
+ struct outstanding *out;
+
+ if (m == NULL)
+ break;
+ if (GST_MESSAGE_TYPE(m) != GST_MESSAGE_APPLICATION) {
+ gst_message_unref (m);
+ break;
+ }
+
+ s = gst_message_get_structure(m);
+ if (gst_structure_has_name(s, "application/mgr-exit")) {
+ gst_message_unref (m);
+ break;
+ }
+
+ /* Otherwise, this must be an EOS message for an outstanding pipe */
+ ca_assert(gst_structure_has_name(s, "application/eos"));
+ v = gst_structure_get_value(s, "info");
+ ca_assert(v);
+ out = g_value_get_pointer(v);
+ ca_assert(out);
+
+ /* Set pipeline back to NULL to close things. By the time this
+ * completes, we can be sure bus_cb won't be called */
+ if (gst_element_set_state(out->pipeline, GST_STATE_NULL) ==
+ GST_STATE_CHANGE_FAILURE) {
+ gst_message_unref (m);
+ break;
+ }
+ if (out->callback)
+ out->callback(out->context, out->id, out->err, out->userdata);
+
+ ca_mutex_lock(p->outstanding_mutex);
+ CA_LLIST_REMOVE(struct outstanding, p->outstanding, out);
+ outstanding_free(out);
+ ca_mutex_unlock(p->outstanding_mutex);
+
+ gst_message_unref(m);
+ } while (TRUE);
+
+ /* Signal the semaphore and exit */
+ ca_mutex_lock(p->outstanding_mutex);
+ if (p->signal_semaphore)
+ sem_post(&p->semaphore);
+ p->mgr_thread_running = FALSE;
+ ca_mutex_unlock(p->outstanding_mutex);
+
+ gst_bus_set_flushing(bus, TRUE);
+ g_object_unref (bus);
+ return NULL;
+}
+
+
int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_callback_t cb, void *userdata) {
struct private *p;
- struct outstanding *out = NULL;
+ struct outstanding *out;
ca_sound_file *f;
- GstElement *decodebin, *sink, *audioconvert, *audioresample, *bin;
+ GstElement *decodebin, *sink, *audioconvert, *audioresample, *abin;
GstBus *bus;
GstPad *audiopad;
int ret;
@@ -323,10 +406,13 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal
ca_return_val_if_fail(proplist, CA_ERROR_INVALID);
ca_return_val_if_fail(!userdata || cb, CA_ERROR_INVALID);
+ out = NULL;
f = NULL;
sink = NULL;
decodebin = NULL;
-
+ audioconvert = NULL;
+ audioresample = NULL;
+ abin = NULL;
p = PRIVATE(c);
if ((ret = ca_lookup_sound_with_callback(&f, ca_gst_sound_file_open, NULL, &p->theme, c->props, proplist)) < 0)
@@ -344,43 +430,55 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal
|| !(decodebin = gst_element_factory_make("decodebin2", NULL))
|| !(audioconvert = gst_element_factory_make("audioconvert", NULL))
|| !(audioresample = gst_element_factory_make("audioresample", NULL))
- || !(sink = gst_element_factory_make("autoaudiosink", NULL))) {
+ || !(sink = gst_element_factory_make("autoaudiosink", NULL))
+ || !(abin = gst_bin_new ("audiobin"))) {
+
+ /* At this point, if there is a failure, free each plugin separately. */
+ if (out->pipeline != NULL)
+ g_object_unref (out->pipeline);
+ if (decodebin != NULL)
+ g_object_unref(decodebin);
+ if (audioconvert != NULL)
+ g_object_unref(audioconvert);
+ if (audioresample != NULL)
+ g_object_unref(audioresample);
+ if (sink != NULL)
+ g_object_unref(sink);
+ if (abin != NULL)
+ g_object_unref(abin);
+
+ ca_free(out);
+
ret = CA_ERROR_OOM;
goto fail;
}
- bin = gst_bin_new("audiobin");
-
- g_signal_connect(decodebin, "new-decoded-pad", G_CALLBACK (on_pad_added), bin);
-
bus = gst_pipeline_get_bus(GST_PIPELINE (out->pipeline));
gst_bus_set_sync_handler(bus, bus_cb, out);
gst_object_unref(bus);
- gst_bin_add_many(GST_BIN (out->pipeline),
- f->fdsrc, decodebin, NULL);
-
- if (!gst_element_link(f->fdsrc, decodebin)) {
- f->fdsrc = NULL;
- decodebin = NULL;
- audioconvert = NULL;
- audioresample = NULL;
- sink = NULL;
- goto fail;
- }
-
- gst_bin_add_many(GST_BIN (bin), audioconvert, audioresample, sink, NULL);
+ g_signal_connect(decodebin, "new-decoded-pad",
+ G_CALLBACK (on_pad_added), abin);
+ gst_bin_add_many(GST_BIN (abin), audioconvert, audioresample, sink, NULL);
gst_element_link_many(audioconvert, audioresample, sink, NULL);
audiopad = gst_element_get_pad(audioconvert, "sink");
- gst_element_add_pad(bin, gst_ghost_pad_new("sink", audiopad));
-
+ gst_element_add_pad(abin, gst_ghost_pad_new("sink", audiopad));
gst_object_unref(audiopad);
- gst_bin_add(GST_BIN (out->pipeline), bin);
+ gst_bin_add_many(GST_BIN (out->pipeline),
+ f->fdsrc, decodebin, abin, NULL);
+ if (!gst_element_link(f->fdsrc, decodebin)) {
+ /* Bin now owns the fdsrc... */
+ f->fdsrc = NULL;
+
+ outstanding_free(out);
+ ret = CA_ERROR_OOM;
+ goto fail;
+ }
+ /* Bin now owns the fdsrc... */
+ f->fdsrc = NULL;
- decodebin = NULL;
- sink = NULL;
ca_free(f);
f = NULL;
@@ -388,11 +486,8 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal
CA_LLIST_PREPEND(struct outstanding, p->outstanding, out);
ca_mutex_unlock(p->outstanding_mutex);
- if (gst_element_set_state(out->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
- ca_mutex_lock(p->outstanding_mutex);
- CA_LLIST_REMOVE(struct outstanding, p->outstanding, out);
- ca_mutex_unlock(p->outstanding_mutex);
-
+ if (gst_element_set_state(out->pipeline,
+ GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
ret = CA_ERROR_NOTAVAILABLE;
goto fail;
}
@@ -406,17 +501,6 @@ fail:
if (f)
ca_free(f);
- if (sink)
- gst_object_unref(sink);
-
- if (decodebin)
- gst_object_unref(decodebin);
-
- if (out && out->pipeline)
- gst_object_unref(out->pipeline);
-
- ca_free(out);
-
return ret;
}
@@ -431,31 +515,34 @@ int driver_cancel(ca_context *c, uint32_t id) {
ca_mutex_lock(p->outstanding_mutex);
- for (out = p->outstanding; out; out = out->next) {
- GstElement *pipeline;
+ for (out = p->outstanding; out;/* out = out->next*/) {
+ struct outstanding *next;
- if (out->id != id)
+ if (out->id != id || out->pipeline == NULL || out->dead == TRUE)
+ {
+ out = out->next;
continue;
+ }
- if (out->pipeline == NULL)
- break;
-
+ if (gst_element_set_state(out->pipeline, GST_STATE_NULL) ==
+ GST_STATE_CHANGE_FAILURE) {
+ goto error;
+ }
if (out->callback)
out->callback(c, out->id, CA_ERROR_CANCELED, out->userdata);
-
- pipeline = out->pipeline;
- out->dead = TRUE;
-
- ca_mutex_unlock(p->outstanding_mutex);
- gst_element_set_state(out->pipeline, GST_STATE_NULL);
- ca_mutex_lock(p->outstanding_mutex);
-
- gst_object_unref(pipeline);
+ next = out->next;
+ CA_LLIST_REMOVE(struct outstanding, p->outstanding, out);
+ outstanding_free(out);
+ out = next;
}
ca_mutex_unlock(p->outstanding_mutex);
return CA_SUCCESS;
+
+error:
+ ca_mutex_unlock(p->outstanding_mutex);
+ return CA_ERROR_SYSTEM;
}
int driver_cache(ca_context *c, ca_proplist *proplist) {