diff options
Diffstat (limited to 'src/gstreamer.c')
| -rw-r--r-- | src/gstreamer.c | 327 | 
1 files changed, 207 insertions, 120 deletions
diff --git a/src/gstreamer.c b/src/gstreamer.c index f008849..4173479 100644 --- a/src/gstreamer.c +++ b/src/gstreamer.c @@ -46,6 +46,7 @@ struct outstanding {      CA_LLIST_FIELDS(struct outstanding);      ca_bool_t dead;      uint32_t id; +    int err;      ca_finish_callback_t callback;      void *userdata;      GstElement *pipeline; @@ -54,26 +55,38 @@ struct outstanding {  struct private {      ca_theme_data *theme; -    ca_mutex *outstanding_mutex;      ca_bool_t signal_semaphore;      sem_t semaphore; + +    GstBus *mgr_bus; + +    /* Everything below protected by the outstanding_mutex */ +    ca_mutex *outstanding_mutex; +    ca_bool_t mgr_thread_running;      ca_bool_t semaphore_allocated;      CA_LLIST_HEAD(struct outstanding, outstanding);  };  #define PRIVATE(c) ((struct private *) ((c)->private)) +static void* thread_func(void *userdata); +static void send_eos_msg(struct outstanding *out, int err); +static void send_mgr_exit_msg (struct private *p); +  static void outstanding_free(struct outstanding *o) {      GstBus *bus;      ca_assert(o); -    bus = gst_pipeline_get_bus(GST_PIPELINE (o->pipeline)); -    gst_bus_set_sync_handler(bus, NULL, NULL); -    gst_object_unref(bus); +    if (o->pipeline) { +        bus = gst_pipeline_get_bus(GST_PIPELINE (o->pipeline)); +        if (bus != NULL) { +           gst_bus_set_sync_handler(bus, NULL, NULL); +           gst_object_unref(bus); +        } -    if (o->pipeline)          gst_object_unref(GST_OBJECT(o->pipeline)); +    }      ca_free(o);  } @@ -81,6 +94,7 @@ static void outstanding_free(struct outstanding *o) {  int driver_open(ca_context *c) {      GError *error = NULL;      struct private *p; +    pthread_t thread;      ca_return_val_if_fail(c, CA_ERROR_INVALID);      ca_return_val_if_fail(!PRIVATE(c), CA_ERROR_INVALID); @@ -95,6 +109,7 @@ int driver_open(ca_context *c) {      if (!(p = ca_new0(struct private, 1)))          return CA_ERROR_OOM; +    c->private = p;      if (!(p->outstanding_mutex = ca_mutex_new())) {          driver_destroy(c); @@ -105,10 +120,21 @@ int driver_open(ca_context *c) {          driver_destroy(c);          return CA_ERROR_OOM;      } -      p->semaphore_allocated = TRUE; -    c->private = p; +    p->mgr_bus = gst_bus_new(); +    if (p->mgr_bus == NULL) { +        driver_destroy(c); +        return CA_ERROR_OOM; +    } +    gst_bus_set_flushing(p->mgr_bus, FALSE); + +    /* Give a reference to the bus to the mgr thread */ +    if (pthread_create(&thread, NULL, thread_func, p) < 0) { +        driver_destroy(c); +        return CA_ERROR_OOM; +    } +    p->mgr_thread_running = TRUE;      return CA_SUCCESS;  } @@ -128,33 +154,18 @@ int driver_destroy(ca_context *c) {          /* Tell all player threads to terminate */          out = p->outstanding;          while (out) { -            GstElement *pipeline; - -            if (out->dead) { -                out = out->next; -                continue; -            } - -            pipeline = out->pipeline; -            out->dead = TRUE; - -            if (out->callback) -                out->callback(c, out->id, CA_ERROR_DESTROYED, out->userdata); - +            if (!out->dead) +                send_eos_msg(out, CA_ERROR_DESTROYED);              out = out->next; - -            ca_mutex_unlock(p->outstanding_mutex); - -            gst_element_set_state(pipeline, GST_STATE_NULL); -            gst_object_unref(GST_OBJECT(pipeline)); - -            ca_mutex_lock(p->outstanding_mutex);          } -        if (p->semaphore_allocated) { -            /* Now wait until all players are destroyed */ +        /* Now that we've sent EOS for all pending players, append a +         * message to wait for the mgr thread to exit */ +        if (p->mgr_thread_running && p->semaphore_allocated) { +            send_mgr_exit_msg(p); +              p->signal_semaphore = TRUE; -            while (p->outstanding) { +            while (p->mgr_thread_running) {                  ca_mutex_unlock(p->outstanding_mutex);                  sem_wait(&p->semaphore);                  ca_mutex_lock(p->outstanding_mutex); @@ -165,6 +176,9 @@ int driver_destroy(ca_context *c) {          ca_mutex_free(p->outstanding_mutex);      } +    if (p->mgr_bus) +        g_object_unref(p->mgr_bus); +      if (p->theme)          ca_theme_data_free(p->theme); @@ -194,7 +208,24 @@ int driver_change_props(ca_context *c, ca_proplist *changed, ca_proplist *merged      return CA_SUCCESS;  } -static GstBusSyncReply bus_cb(GstBus *bus, GstMessage *message, gpointer data) { +static void +send_eos_msg(struct outstanding *out, int err) { +    struct private *p; +    GstMessage *m; +    GstStructure *s; + +    out->dead = TRUE; +    out->err = err; + +    p = PRIVATE(out->context); +    s = gst_structure_new("application/eos", "info", G_TYPE_POINTER, out, NULL); +    m = gst_message_new_application (GST_OBJECT (out->pipeline), s); + +    gst_bus_post (p->mgr_bus, m); +} + +static GstBusSyncReply +bus_cb(GstBus *bus, GstMessage *message, gpointer data) {      int err;      struct outstanding *out;      struct private *p; @@ -208,52 +239,28 @@ static GstBusSyncReply bus_cb(GstBus *bus, GstMessage *message, gpointer data) {      switch (GST_MESSAGE_TYPE(message)) {          /* for all elements */ -        case GST_MESSAGE_UNKNOWN: -            return GST_BUS_DROP;          case GST_MESSAGE_ERROR:              err = CA_ERROR_SYSTEM;              break; -            /* only from bin */          case GST_MESSAGE_EOS: +            /* only respect EOS from the toplevel pipeline */              if (GST_OBJECT(out->pipeline) != GST_MESSAGE_SRC(message)) -                return GST_BUS_DROP; +              return GST_BUS_PASS;              err = CA_SUCCESS;              break; -        case GST_MESSAGE_STATE_CHANGED: { -            GstState pending; - -            if (GST_OBJECT(out->pipeline) != GST_MESSAGE_SRC(message)) -                return GST_BUS_DROP; - -            gst_message_parse_state_changed(message, NULL, NULL, &pending); -            /* g_debug (gst_element_state_get_name (pending)); */ - -            if (pending == GST_STATE_NULL || pending == GST_STATE_VOID_PENDING) -                return GST_BUS_PASS; -            else -                return GST_BUS_DROP; -            break; -        }          default: -            return GST_BUS_DROP; +            return GST_BUS_PASS;      } -    if (!out->dead && out->callback) -        out->callback(out->context, out->id, err, out->userdata); - +    /* Bin finished playback: ask the manager thread to shut it +     * down, since we can't from the sync message handler */      ca_mutex_lock(p->outstanding_mutex); - -    CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); - -    if (!p->outstanding && p->signal_semaphore) -        sem_post(&p->semaphore); - -    outstanding_free(out); - +    if (!out->dead) +        send_eos_msg(out, err);      ca_mutex_unlock(p->outstanding_mutex); -    return GST_BUS_DROP; +    return GST_BUS_PASS;  }  struct ca_sound_file { @@ -310,11 +317,87 @@ static void on_pad_added(GstElement *element, GstPad *pad, gboolean arg1, gpoint      gst_caps_unref(caps);  } +static void +send_mgr_exit_msg (struct private *p) { +    GstMessage *m; +    GstStructure *s; + +    s = gst_structure_new("application/mgr-exit", NULL); +    m = gst_message_new_application (NULL, s); + +    gst_bus_post (p->mgr_bus, m); +} + +/* Global manager thread that shuts down GStreamer pipelines when ordered */ +static void* thread_func(void *userdata) { +    struct private *p = userdata; +    GstBus *bus = g_object_ref(p->mgr_bus); + +    pthread_detach(pthread_self()); + +    /* Pop messages from the manager bus until we see an exit command */ +    do { +        GstMessage *m = gst_bus_timed_pop(bus, GST_CLOCK_TIME_NONE); +        const GstStructure *s; +        const GValue *v; +        struct outstanding *out; + +        if (m == NULL) +            break; +        if (GST_MESSAGE_TYPE(m) != GST_MESSAGE_APPLICATION) { +            gst_message_unref (m); +            break; +        } + +        s = gst_message_get_structure(m); +        if (gst_structure_has_name(s, "application/mgr-exit")) { +            gst_message_unref (m); +            break; +        } + +        /* Otherwise, this must be an EOS message for an outstanding pipe */ +        ca_assert(gst_structure_has_name(s, "application/eos")); +        v  = gst_structure_get_value(s, "info"); +        ca_assert(v); +        out = g_value_get_pointer(v); +        ca_assert(out); + +        /* Set pipeline back to NULL to close things. By the time this +         * completes, we can be sure bus_cb won't be called */ +        if (gst_element_set_state(out->pipeline, GST_STATE_NULL) == +                GST_STATE_CHANGE_FAILURE) { +            gst_message_unref (m); +            break; +        } +        if (out->callback) +            out->callback(out->context, out->id, out->err, out->userdata); + +        ca_mutex_lock(p->outstanding_mutex); +        CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); +        outstanding_free(out); +        ca_mutex_unlock(p->outstanding_mutex); + +        gst_message_unref(m); +    } while (TRUE); + +    /* Signal the semaphore and exit */ +    ca_mutex_lock(p->outstanding_mutex); +    if (p->signal_semaphore) +        sem_post(&p->semaphore); +    p->mgr_thread_running = FALSE; +    ca_mutex_unlock(p->outstanding_mutex); + +    gst_bus_set_flushing(bus, TRUE); +    g_object_unref (bus); +    return NULL; +} + +  int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_callback_t cb, void *userdata) {      struct private *p; -    struct outstanding *out = NULL; +    struct outstanding *out;      ca_sound_file *f; -    GstElement *decodebin, *sink, *audioconvert, *audioresample, *bin; +    GstElement *decodebin, *sink, *audioconvert, *audioresample, *abin;      GstBus *bus;      GstPad *audiopad;      int ret; @@ -323,10 +406,13 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal      ca_return_val_if_fail(proplist, CA_ERROR_INVALID);      ca_return_val_if_fail(!userdata || cb, CA_ERROR_INVALID); +    out = NULL;      f = NULL;      sink = NULL;      decodebin = NULL; - +    audioconvert = NULL; +    audioresample = NULL; +    abin = NULL;      p = PRIVATE(c);      if ((ret = ca_lookup_sound_with_callback(&f, ca_gst_sound_file_open, NULL, &p->theme, c->props, proplist)) < 0) @@ -344,43 +430,55 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal          || !(decodebin = gst_element_factory_make("decodebin2", NULL))          || !(audioconvert = gst_element_factory_make("audioconvert", NULL))          || !(audioresample = gst_element_factory_make("audioresample", NULL)) -        || !(sink = gst_element_factory_make("autoaudiosink", NULL))) { +        || !(sink = gst_element_factory_make("autoaudiosink", NULL)) +        || !(abin = gst_bin_new ("audiobin"))) { + +        /* At this point, if there is a failure, free each plugin separately. */ +        if (out->pipeline != NULL) +           g_object_unref (out->pipeline); +        if (decodebin != NULL) +           g_object_unref(decodebin); +        if (audioconvert != NULL) +           g_object_unref(audioconvert); +        if (audioresample != NULL) +           g_object_unref(audioresample); +        if (sink != NULL) +           g_object_unref(sink); +        if (abin != NULL) +           g_object_unref(abin); + +        ca_free(out); +          ret = CA_ERROR_OOM;          goto fail;      } -    bin = gst_bin_new("audiobin"); - -    g_signal_connect(decodebin, "new-decoded-pad", G_CALLBACK (on_pad_added), bin); -      bus = gst_pipeline_get_bus(GST_PIPELINE (out->pipeline));      gst_bus_set_sync_handler(bus, bus_cb, out);      gst_object_unref(bus); -    gst_bin_add_many(GST_BIN (out->pipeline), -                     f->fdsrc, decodebin, NULL); - -    if (!gst_element_link(f->fdsrc, decodebin)) { -        f->fdsrc = NULL; -        decodebin = NULL; -        audioconvert = NULL; -        audioresample = NULL; -        sink = NULL; -        goto fail; -    } - -    gst_bin_add_many(GST_BIN (bin), audioconvert, audioresample, sink, NULL); +    g_signal_connect(decodebin, "new-decoded-pad", +                     G_CALLBACK (on_pad_added), abin); +    gst_bin_add_many(GST_BIN (abin), audioconvert, audioresample, sink, NULL);      gst_element_link_many(audioconvert, audioresample, sink, NULL);      audiopad = gst_element_get_pad(audioconvert, "sink"); -    gst_element_add_pad(bin, gst_ghost_pad_new("sink", audiopad)); - +    gst_element_add_pad(abin, gst_ghost_pad_new("sink", audiopad));      gst_object_unref(audiopad); -    gst_bin_add(GST_BIN (out->pipeline), bin); +    gst_bin_add_many(GST_BIN (out->pipeline), +                     f->fdsrc, decodebin, abin, NULL); +    if (!gst_element_link(f->fdsrc, decodebin)) { +        /* Bin now owns the fdsrc... */ +        f->fdsrc = NULL; + +        outstanding_free(out); +        ret = CA_ERROR_OOM; +        goto fail; +    } +    /* Bin now owns the fdsrc... */ +    f->fdsrc = NULL; -    decodebin = NULL; -    sink = NULL;      ca_free(f);      f = NULL; @@ -388,11 +486,8 @@ int driver_play(ca_context *c, uint32_t id, ca_proplist *proplist, ca_finish_cal      CA_LLIST_PREPEND(struct outstanding, p->outstanding, out);      ca_mutex_unlock(p->outstanding_mutex); -    if (gst_element_set_state(out->pipeline, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) { -        ca_mutex_lock(p->outstanding_mutex); -        CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); -        ca_mutex_unlock(p->outstanding_mutex); - +    if (gst_element_set_state(out->pipeline, +                              GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {          ret = CA_ERROR_NOTAVAILABLE;          goto fail;      } @@ -406,17 +501,6 @@ fail:      if (f)          ca_free(f); -    if (sink) -        gst_object_unref(sink); - -    if (decodebin) -        gst_object_unref(decodebin); - -    if (out && out->pipeline) -        gst_object_unref(out->pipeline); - -    ca_free(out); -      return ret;  } @@ -431,31 +515,34 @@ int driver_cancel(ca_context *c, uint32_t id) {      ca_mutex_lock(p->outstanding_mutex); -    for (out = p->outstanding; out; out = out->next) { -        GstElement *pipeline; +    for (out = p->outstanding; out;/* out = out->next*/) { +        struct outstanding *next; -        if (out->id != id) +        if (out->id != id || out->pipeline == NULL || out->dead == TRUE) +        { +            out = out->next;              continue; +        } -        if (out->pipeline == NULL) -            break; - +        if (gst_element_set_state(out->pipeline, GST_STATE_NULL) == +                GST_STATE_CHANGE_FAILURE) { +            goto error; +        }          if (out->callback)              out->callback(c, out->id, CA_ERROR_CANCELED, out->userdata); - -        pipeline = out->pipeline; -        out->dead = TRUE; - -        ca_mutex_unlock(p->outstanding_mutex); -        gst_element_set_state(out->pipeline, GST_STATE_NULL); -        ca_mutex_lock(p->outstanding_mutex); - -        gst_object_unref(pipeline); +        next = out->next; +        CA_LLIST_REMOVE(struct outstanding, p->outstanding, out); +        outstanding_free(out); +        out = next;      }      ca_mutex_unlock(p->outstanding_mutex);      return CA_SUCCESS; + +error: +    ca_mutex_unlock(p->outstanding_mutex); +    return CA_ERROR_SYSTEM;  }  int driver_cache(ca_context *c, ca_proplist *proplist) {  | 
