summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2009-04-08 13:52:41 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2009-04-09 17:26:20 +0200
commit6bc6cafcc6086eb3072f4f1b4936393d9280fbb9 (patch)
tree8198f67badfe74d74d6fbdd75b06ac59c031ce4c
parent28d733d53b241a0c4a80b30f32e70b4750adf573 (diff)
pulsesink: rewrite pulsesink
Derive from BaseAudioSink and implement our custom ringbuffer that maps to the internal pulseaudio ringbuffer.
-rw-r--r--ext/pulse/pulsesink.c2021
-rw-r--r--ext/pulse/pulsesink.h18
2 files changed, 1269 insertions, 770 deletions
diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c
index 5064f6f4..6fcd3378 100644
--- a/ext/pulse/pulsesink.c
+++ b/ext/pulse/pulsesink.c
@@ -1,7 +1,7 @@
-/*
- * GStreamer pulseaudio plugin
+/* GStreamer pulseaudio plugin
*
* Copyright (c) 2004-2008 Lennart Poettering
+ * (c) 2009 Wim Taymans
*
* gst-pulse is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
@@ -66,38 +66,1067 @@ enum
PROP_VOLUME
};
-static void gst_pulsesink_destroy_stream (GstPulseSink * pulsesink);
+#define GST_TYPE_PULSERING_BUFFER \
+ (gst_pulseringbuffer_get_type())
+#define GST_PULSERING_BUFFER(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer))
+#define GST_PULSERING_BUFFER_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass))
+#define GST_PULSERING_BUFFER_GET_CLASS(obj) \
+ (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass))
+#define GST_PULSERING_BUFFER_CAST(obj) \
+ ((GstPulseRingBuffer *)obj)
+#define GST_IS_PULSERING_BUFFER(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER))
+#define GST_IS_PULSERING_BUFFER_CLASS(klass)\
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER))
+
+typedef struct _GstPulseRingBuffer GstPulseRingBuffer;
+typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass;
+
+/* We keep a custom ringbuffer that is backed up by data allocated by
+ * pulseaudio. We must also overide the commit function to write into
+ * pulseaudio memory instead. */
+struct _GstPulseRingBuffer
+{
+ GstRingBuffer object;
-static void gst_pulsesink_destroy_context (GstPulseSink * pulsesink);
+ gchar *stream_name;
-static void gst_pulsesink_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec);
-static void gst_pulsesink_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec);
-static void gst_pulsesink_finalize (GObject * object);
+ pa_context *context;
+ pa_stream *stream;
-static gboolean gst_pulsesink_open (GstAudioSink * asink);
+ pa_sample_spec sample_spec;
-static gboolean gst_pulsesink_close (GstAudioSink * asink);
+ gboolean corked;
+ gboolean in_commit;
+ gboolean paused;
+ guint required;
+};
-static gboolean gst_pulsesink_prepare (GstAudioSink * asink,
+struct _GstPulseRingBufferClass
+{
+ GstRingBufferClass parent_class;
+};
+
+static void gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass);
+static void gst_pulseringbuffer_init (GstPulseRingBuffer * ringbuffer,
+ GstPulseRingBufferClass * klass);
+static void gst_pulseringbuffer_finalize (GObject * object);
+
+static GstRingBufferClass *ring_parent_class = NULL;
+
+static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf);
+static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf);
+static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf,
GstRingBufferSpec * spec);
-static gboolean gst_pulsesink_unprepare (GstAudioSink * asink);
+static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf);
+static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf);
+static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf);
+static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf);
+static gboolean gst_pulseringbuffer_activate (GstRingBuffer * buf,
+ gboolean active);
+static guint gst_pulseringbuffer_commit (GstRingBuffer * buf,
+ guint64 * sample, guchar * data, gint in_samples, gint out_samples,
+ gint * accum);
+
+/* ringbuffer abstract base class */
+static GType
+gst_pulseringbuffer_get_type (void)
+{
+ static GType ringbuffer_type = 0;
+
+ if (!ringbuffer_type) {
+ static const GTypeInfo ringbuffer_info = {
+ sizeof (GstPulseRingBufferClass),
+ NULL,
+ NULL,
+ (GClassInitFunc) gst_pulseringbuffer_class_init,
+ NULL,
+ NULL,
+ sizeof (GstPulseRingBuffer),
+ 0,
+ (GInstanceInitFunc) gst_pulseringbuffer_init,
+ NULL
+ };
+
+ ringbuffer_type =
+ g_type_register_static (GST_TYPE_RING_BUFFER, "GstPulseSinkRingBuffer",
+ &ringbuffer_info, 0);
+ }
+ return ringbuffer_type;
+}
+
+static void
+gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass)
+{
+ GObjectClass *gobject_class;
+ GstObjectClass *gstobject_class;
+ GstRingBufferClass *gstringbuffer_class;
+
+ gobject_class = (GObjectClass *) klass;
+ gstobject_class = (GstObjectClass *) klass;
+ gstringbuffer_class = (GstRingBufferClass *) klass;
+
+ ring_parent_class = g_type_class_peek_parent (klass);
+
+ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_finalize);
+
+ gstringbuffer_class->open_device =
+ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device);
+ gstringbuffer_class->close_device =
+ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device);
+ gstringbuffer_class->acquire =
+ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire);
+ gstringbuffer_class->release =
+ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release);
+ gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
+ gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause);
+ gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start);
+ gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop);
+
+ gstringbuffer_class->activate =
+ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_activate);
+ gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit);
+}
-static guint gst_pulsesink_write (GstAudioSink * asink, gpointer data,
- guint length);
-static guint gst_pulsesink_delay (GstAudioSink * asink);
+static void
+gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf,
+ GstPulseRingBufferClass * g_class)
+{
+ pbuf->stream_name = NULL;
+ pbuf->context = NULL;
+ pbuf->stream = NULL;
-static void gst_pulsesink_reset (GstAudioSink * asink);
+#if HAVE_PULSE_0_9_13
+ pa_sample_spec_init (&pbuf->sample_spec);
+#else
+ pbuf->sample_spec.format = PA_SAMPLE_INVALID;
+ pbuf->sample_spec.rate = 0;
+ pbuf->sample_spec.channels = 0;
+#endif
-static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
+ pbuf->corked = TRUE;
+}
-static GstStateChangeReturn gst_pulsesink_change_state (GstElement *
- element, GstStateChange transition);
+static void
+gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf)
+{
+ if (pbuf->stream) {
+ pa_stream_disconnect (pbuf->stream);
-static void gst_pulsesink_init_interfaces (GType type);
+ /* Make sure we don't get any further callbacks */
+ pa_stream_set_state_callback (pbuf->stream, NULL, NULL);
+ pa_stream_set_write_callback (pbuf->stream, NULL, NULL);
+ pa_stream_set_latency_update_callback (pbuf->stream, NULL, NULL);
+
+ pa_stream_unref (pbuf->stream);
+ pbuf->stream = NULL;
+ }
+
+ g_free (pbuf->stream_name);
+ pbuf->stream_name = NULL;
+}
+
+static void
+gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf)
+{
+ gst_pulsering_destroy_stream (pbuf);
+
+ if (pbuf->context) {
+ pa_context_disconnect (pbuf->context);
+
+ /* Make sure we don't get any further callbacks */
+ pa_context_set_state_callback (pbuf->context, NULL, NULL);
+ pa_context_set_subscribe_callback (pbuf->context, NULL, NULL);
+
+ pa_context_unref (pbuf->context);
+ pbuf->context = NULL;
+ }
+}
+
+static void
+gst_pulseringbuffer_finalize (GObject * object)
+{
+ GstPulseRingBuffer *ringbuffer;
+
+ ringbuffer = GST_PULSERING_BUFFER_CAST (object);
+
+ gst_pulsering_destroy_context (ringbuffer);
+
+ G_OBJECT_CLASS (ring_parent_class)->finalize (object);
+}
+
+static gboolean
+gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
+{
+ if (!pbuf->context
+ || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context))
+ || !pbuf->stream
+ || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) {
+ const gchar *err_str = pbuf->context ?
+ pa_strerror (pa_context_errno (pbuf->context)) : NULL;
+
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s",
+ err_str), (NULL));
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static void
+gst_pulsering_context_state_cb (pa_context * c, void *userdata)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ pa_context_state_t state;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ state = pa_context_get_state (c);
+ GST_LOG_OBJECT (psink, "got new context state %d", state);
+
+ switch (state) {
+ case PA_CONTEXT_READY:
+ case PA_CONTEXT_TERMINATED:
+ case PA_CONTEXT_FAILED:
+ GST_LOG_OBJECT (psink, "signaling");
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+ break;
+
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ }
+}
+
+#if HAVE_PULSE_0_9_12
+static void
+gst_pulsering_context_subscribe_cb (pa_context * c,
+ pa_subscription_event_type_t t, uint32_t idx, void *userdata)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
+ t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
+ return;
+
+ if (!psink->stream)
+ return;
+
+ if (idx != pa_stream_get_index (pbuf->stream))
+ return;
+
+ /* Actually this event is also triggered when other properties of
+ * the stream change that are unrelated to the volume. However it is
+ * probably cheaper to signal the change here and check for the
+ * volume when the GObject property is read instead of querying it always. */
+
+ /* inform streaming thread to notify */
+ g_atomic_int_compare_and_exchange (&psink->notify, 0, 1);
+}
+#endif
-static gboolean gst_pulsesink_is_dead (GstPulseSink * pulsesink);
+/* will be called when the device should be opened. In this case we will connect
+ * to the server. We should not try to open any streams in this state. */
+static gboolean
+gst_pulseringbuffer_open_device (GstRingBuffer * buf)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ gchar *name;
+ pa_mainloop_api *api;
+
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+
+ g_assert (!pbuf->context);
+ g_assert (!pbuf->stream);
+
+ name = gst_pulse_client_name ();
+
+ pa_threaded_mainloop_lock (psink->mainloop);
+
+ /* get the mainloop api and create a context */
+ GST_LOG_OBJECT (psink, "new context with name %s", GST_STR_NULL (name));
+ api = pa_threaded_mainloop_get_api (psink->mainloop);
+ if (!(pbuf->context = pa_context_new (api, name)))
+ goto create_failed;
+
+ /* register some essential callbacks */
+ pa_context_set_state_callback (pbuf->context,
+ gst_pulsering_context_state_cb, pbuf);
+#if HAVE_PULSE_0_9_12
+ pa_context_set_subscribe_callback (psink->context,
+ gst_pulsering_context_subscribe_cb, pbuf);
+#endif
+
+ /* try to connect to the server and wait for completioni, we don't want to
+ * autospawn a deamon */
+ GST_LOG_OBJECT (psink, "connect to server %s", GST_STR_NULL (psink->server));
+ if (pa_context_connect (pbuf->context, psink->server, PA_CONTEXT_NOAUTOSPAWN,
+ NULL) < 0)
+ goto connect_failed;
+
+ for (;;) {
+ pa_context_state_t state;
+
+ state = pa_context_get_state (pbuf->context);
+
+ GST_LOG_OBJECT (psink, "context state is now %d", state);
+
+ if (!PA_CONTEXT_IS_GOOD (state))
+ goto connect_failed;
+
+ if (state == PA_CONTEXT_READY)
+ break;
+
+ /* Wait until the context is ready */
+ GST_LOG_OBJECT (psink, "waiting..");
+ pa_threaded_mainloop_wait (psink->mainloop);
+ }
+
+ GST_LOG_OBJECT (psink, "opened the device");
+
+ pa_threaded_mainloop_unlock (psink->mainloop);
+ g_free (name);
+
+ return TRUE;
+
+ /* ERRORS */
+unlock_and_fail:
+ {
+ gst_pulsering_destroy_context (pbuf);
+
+ pa_threaded_mainloop_unlock (psink->mainloop);
+ g_free (name);
+ return FALSE;
+ }
+create_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("Failed to create context"), (NULL));
+ goto unlock_and_fail;
+ }
+connect_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock_and_fail;
+ }
+}
+
+/* close the device */
+static gboolean
+gst_pulseringbuffer_close_device (GstRingBuffer * buf)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
+
+ GST_LOG_OBJECT (psink, "closing device");
+
+ pa_threaded_mainloop_lock (psink->mainloop);
+ gst_pulsering_destroy_context (pbuf);
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ GST_LOG_OBJECT (psink, "closed device");
+
+ return TRUE;
+}
+
+static void
+gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ pa_stream_state_t state;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ state = pa_stream_get_state (s);
+ GST_LOG_OBJECT (psink, "got new stream state %d", state);
+
+ switch (state) {
+ case PA_STREAM_READY:
+ case PA_STREAM_FAILED:
+ case PA_STREAM_TERMINATED:
+ GST_LOG_OBJECT (psink, "signaling");
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+ break;
+ case PA_STREAM_UNCONNECTED:
+ case PA_STREAM_CREATING:
+ break;
+ }
+}
+
+static void
+gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
+
+ if (pbuf->in_commit) {
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+ }
+}
+
+static void
+gst_pulsering_stream_latency_update_cb (pa_stream * s, void *userdata)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ GST_LOG_OBJECT (psink, "got latency update callback");
+
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+}
+
+/* This method should create a new stream of the given @spec. No playback should
+ * start yet so we start in the corked state. */
+static gboolean
+gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ pa_buffer_attr buf_attr;
+ const pa_buffer_attr *buf_attr_ptr;
+ pa_channel_map channel_map;
+ pa_operation *o = NULL;
+ pa_cvolume v, *pv;
+ pa_stream_flags_t flags;
+ const gchar *name;
+
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+
+ GST_LOG_OBJECT (psink, "creating sample spec");
+ /* convert the gstreamer sample spec to the pulseaudio format */
+ if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec))
+ goto invalid_spec;
+
+ pa_threaded_mainloop_lock (psink->mainloop);
+
+ /* we need a context and a no stream */
+ g_assert (pbuf->context);
+ g_assert (!pbuf->stream);
+
+ /* enable event notifications */
+ GST_LOG_OBJECT (psink, "subscribing to context events");
+ if (!(o = pa_context_subscribe (pbuf->context,
+ PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL)))
+ goto subscribe_failed;
+
+ pa_operation_unref (o);
+
+ /* initialize the channel map */
+ gst_pulse_gst_to_channel_map (&channel_map, spec);
+
+ /* find a good name for the stream */
+ if (psink->stream_name)
+ name = psink->stream_name;
+ else
+ name = "Playback Stream";
+
+ /* create a stream */
+ GST_LOG_OBJECT (psink, "creating stream with name %s", name);
+ if (!(pbuf->stream = pa_stream_new (pbuf->context,
+ name, &pbuf->sample_spec, &channel_map)))
+ goto stream_failed;
+
+ /* install essential callbacks */
+ pa_stream_set_state_callback (pbuf->stream,
+ gst_pulsering_stream_state_cb, pbuf);
+ pa_stream_set_write_callback (pbuf->stream,
+ gst_pulsering_stream_request_cb, pbuf);
+ pa_stream_set_latency_update_callback (pbuf->stream,
+ gst_pulsering_stream_latency_update_cb, pbuf);
+
+ /* buffering requirements */
+ memset (&buf_attr, 0, sizeof (buf_attr));
+ buf_attr.tlength = spec->segtotal * spec->segsize;
+ buf_attr.maxlength = buf_attr.tlength * 2;
+ //buf_attr.prebuf = buf_attr.tlength;
+ buf_attr.prebuf = spec->segsize;
+ buf_attr.minreq = spec->segsize;
+
+ GST_INFO_OBJECT (psink, "tlength: %d", buf_attr.tlength);
+ GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr.maxlength);
+ GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr.prebuf);
+ GST_INFO_OBJECT (psink, "minreq: %d", buf_attr.minreq);
+
+ /* configure volume when we changed it, else we leave the default */
+ if (psink->volume_set) {
+ GST_LOG_OBJECT (psink, "have volume of %f", psink->volume);
+ pv = &v;
+ gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels,
+ psink->volume);
+ } else {
+ pv = NULL;
+ }
+
+ /* construct the flags */
+ flags = PA_STREAM_INTERPOLATE_TIMING |
+ PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_NOT_MONOTONOUS |
+#if HAVE_PULSE_0_9_11
+ PA_STREAM_ADJUST_LATENCY |
+#endif
+ PA_STREAM_START_CORKED;
+
+ /* we always start corked (see flags above) */
+ pbuf->corked = TRUE;
+
+ /* try to connect now */
+ GST_LOG_OBJECT (psink, "connect for playback to device %s",
+ GST_STR_NULL (psink->device));
+ if (pa_stream_connect_playback (pbuf->stream, psink->device,
+ &buf_attr, flags, pv, NULL) < 0)
+ goto connect_failed;
+
+ for (;;) {
+ pa_stream_state_t state;
+
+ state = pa_stream_get_state (pbuf->stream);
+
+ GST_LOG_OBJECT (psink, "stream state is now %d", state);
+
+ if (!PA_STREAM_IS_GOOD (state))
+ goto connect_failed;
+
+ if (state == PA_STREAM_READY)
+ break;
+
+ /* Wait until the stream is ready */
+ pa_threaded_mainloop_wait (psink->mainloop);
+ }
+
+ GST_LOG_OBJECT (psink, "stream is acquired now");
+
+ /* get the actual buffering properties now */
+ buf_attr_ptr = pa_stream_get_buffer_attr (pbuf->stream);
+
+ GST_INFO_OBJECT (psink, "tlength: %d", buf_attr_ptr->tlength);
+ GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr_ptr->maxlength);
+ GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr_ptr->prebuf);
+ GST_INFO_OBJECT (psink, "minreq: %d", buf_attr_ptr->minreq);
+
+ spec->segsize = buf_attr.minreq;
+ spec->segtotal = buf_attr.tlength / spec->segsize;
+
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ return TRUE;
+
+ /* ERRORS */
+unlock_and_fail:
+ {
+ gst_pulsering_destroy_stream (pbuf);
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ return FALSE;
+ }
+invalid_spec:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS,
+ ("Invalid sample specification."), (NULL));
+ return FALSE;
+ }
+subscribe_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_context_subscribe() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock_and_fail;
+ }
+stream_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("Failed to create stream: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock_and_fail;
+ }
+connect_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("Failed to connect stream: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock_and_fail;
+ }
+}
+
+/* free the stream that we acquired before */
+static gboolean
+gst_pulseringbuffer_release (GstRingBuffer * buf)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+
+ pa_threaded_mainloop_lock (psink->mainloop);
+ gst_pulsering_destroy_stream (pbuf);
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ return TRUE;
+}
+
+/* this method should start the thread that starts pulling data. Usually only
+ * used in pull-based scheduling */
+static gboolean
+gst_pulseringbuffer_activate (GstRingBuffer * buf, gboolean active)
+{
+ GstPulseSink *psink;
+
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
+
+ return TRUE;
+}
+
+/* update the corked state of a stream, must be called with the mainloop
+ * lock */
+static gboolean
+gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked)
+{
+ pa_operation *o = NULL;
+ GstPulseSink *psink;
+ gboolean res = FALSE;
+
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked);
+ if (pbuf->corked != corked) {
+ if (!(o = pa_stream_cork (pbuf->stream, corked, NULL, NULL)))
+ goto cork_failed;
+
+ while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
+ pa_threaded_mainloop_wait (psink->mainloop);
+ if (gst_pulsering_is_dead (psink, pbuf))
+ goto server_dead;
+ }
+ pbuf->corked = corked;
+ }
+ res = TRUE;
+
+cleanup:
+ if (o)
+ pa_operation_unref (o);
+
+ return res;
+
+ /* ERRORS */
+server_dead:
+ {
+ GST_DEBUG_OBJECT (psink, "the server is dead");
+ goto cleanup;
+ }
+cork_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_stream_cork() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto cleanup;
+ }
+}
+
+/* start/resume playback ASAP */
+static gboolean
+gst_pulseringbuffer_start (GstRingBuffer * buf)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ gboolean res;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ GST_DEBUG_OBJECT (psink, "uncorking");
+ pa_threaded_mainloop_lock (psink->mainloop);
+ pbuf->paused = FALSE;
+ res = gst_pulsering_set_corked (pbuf, FALSE);
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ return res;
+}
+
+/* pause/stop playback ASAP */
+static gboolean
+gst_pulseringbuffer_pause (GstRingBuffer * buf)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ gboolean res;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ GST_DEBUG_OBJECT (psink, "corking");
+ pa_threaded_mainloop_lock (psink->mainloop);
+ /* make sure the commit method stops writing */
+ pbuf->paused = TRUE;
+ res = gst_pulsering_set_corked (pbuf, TRUE);
+ if (pbuf->in_commit) {
+ /* we are waiting in a commit, signal */
+ GST_DEBUG_OBJECT (psink, "signal commit");
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+ }
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ return res;
+}
+
+static void
+gst_pulsering_success_cb (pa_stream * s, int success, void *userdata)
+{
+ GstPulseRingBuffer *pbuf;
+ GstPulseSink *psink;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+}
+
+/* stop playback, we flush everything. */
+static gboolean
+gst_pulseringbuffer_stop (GstRingBuffer * buf)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ gboolean res = FALSE;
+ pa_operation *o = NULL;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ pa_threaded_mainloop_lock (psink->mainloop);
+ pbuf->paused = TRUE;
+ /* Inform anyone waiting in _commit() call that it shall wakeup */
+ if (pbuf->in_commit) {
+ GST_DEBUG_OBJECT (psink, "signal commit thread");
+ pa_threaded_mainloop_signal (psink->mainloop, 0);
+ }
+
+ /* then try to flush, it's not fatal when this fails */
+ GST_DEBUG_OBJECT (psink, "flushing");
+ if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) {
+ while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
+ GST_DEBUG_OBJECT (psink, "wait for completion");
+ pa_threaded_mainloop_wait (psink->mainloop);
+ if (gst_pulsering_is_dead (psink, pbuf))
+ goto server_dead;
+ }
+ GST_DEBUG_OBJECT (psink, "flush completed");
+ }
+
+ res = TRUE;
+
+cleanup:
+ if (o) {
+ pa_operation_cancel (o);
+ pa_operation_unref (o);
+ }
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ return res;
+
+ /* ERRORS */
+server_dead:
+ {
+ GST_DEBUG_OBJECT (psink, "the server is dead");
+ goto cleanup;
+ }
+}
+
+/* in_samples >= out_samples, rate > 1.0 */
+#define FWD_UP_SAMPLES(s,se,d,de) \
+G_STMT_START { \
+ guint8 *sb = s, *db = d; \
+ while (s <= se && d < de) { \
+ memcpy (d, s, bps); \
+ s += bps; \
+ *accum += outr; \
+ if ((*accum << 1) >= inr) { \
+ *accum -= inr; \
+ d += bps; \
+ } \
+ } \
+ in_samples -= (s - sb)/bps; \
+ out_samples -= (d - db)/bps; \
+ GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess); \
+} G_STMT_END
+
+/* out_samples > in_samples, for rates smaller than 1.0 */
+#define FWD_DOWN_SAMPLES(s,se,d,de) \
+G_STMT_START { \
+ guint8 *sb = s, *db = d; \
+ while (s <= se && d < de) { \
+ memcpy (d, s, bps); \
+ d += bps; \
+ *accum += inr; \
+ if ((*accum << 1) >= outr) { \
+ *accum -= outr; \
+ s += bps; \
+ } \
+ } \
+ in_samples -= (s - sb)/bps; \
+ out_samples -= (d - db)/bps; \
+ GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess); \
+} G_STMT_END
+
+#define REV_UP_SAMPLES(s,se,d,de) \
+G_STMT_START { \
+ guint8 *sb = se, *db = d; \
+ while (s <= se && d < de) { \
+ memcpy (d, se, bps); \
+ se -= bps; \
+ *accum += outr; \
+ while ((*accum << 1) >= inr) { \
+ *accum -= inr; \
+ d += bps; \
+ } \
+ } \
+ in_samples -= (sb - se)/bps; \
+ out_samples -= (d - db)/bps; \
+ GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess); \
+} G_STMT_END
+
+#define REV_DOWN_SAMPLES(s,se,d,de) \
+G_STMT_START { \
+ guint8 *sb = se, *db = d; \
+ while (s <= se && d < de) { \
+ memcpy (d, se, bps); \
+ d += bps; \
+ *accum += inr; \
+ while ((*accum << 1) >= outr) { \
+ *accum -= outr; \
+ se -= bps; \
+ } \
+ } \
+ in_samples -= (sb - se)/bps; \
+ out_samples -= (d - db)/bps; \
+ GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess); \
+} G_STMT_END
+
+
+/* our custom commit function because we write into the buffer of pulseaudio
+ * instead of keeping our own buffer */
+static guint
+gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample,
+ guchar * data, gint in_samples, gint out_samples, gint * accum)
+{
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ guint result;
+ guint bps;
+ guint8 *data_end;
+ gboolean reverse;
+ gint *toprocess;
+ gint inr, outr;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (buf);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
+
+ /* FIXME post message rather than using a signal (as mixer interface) */
+ if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0))
+ g_object_notify (G_OBJECT (psink), "volume");
+
+ /* make sure the ringbuffer is started */
+ if (G_UNLIKELY (g_atomic_int_get (&buf->state) !=
+ GST_RING_BUFFER_STATE_STARTED)) {
+ /* see if we are allowed to start it */
+ if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE))
+ goto no_start;
+
+ GST_DEBUG_OBJECT (buf, "start!");
+ if (!gst_ring_buffer_start (buf))
+ goto start_failed;
+ }
+
+ pa_threaded_mainloop_lock (psink->mainloop);
+ GST_DEBUG_OBJECT (psink, "entering commit");
+ pbuf->in_commit = TRUE;
+
+ bps = buf->spec.bytes_per_sample;
+
+ /* our toy resampler for trick modes */
+ reverse = out_samples < 0;
+ out_samples = ABS (out_samples);
+
+ if (in_samples >= out_samples)
+ toprocess = &in_samples;
+ else
+ toprocess = &out_samples;
+
+ inr = in_samples - 1;
+ outr = out_samples - 1;
+
+ /* data_end points to the last sample we have to write, not past it. This is
+ * needed to properly handle reverse playback: it points to the last sample. */
+ data_end = data + (bps * inr);
+
+ if (pbuf->paused)
+ goto was_paused;
+
+ while (*toprocess > 0) {
+ size_t avail;
+ guint towrite;
+ gint64 offset;
+
+ GST_LOG_OBJECT (psink, "need to write %d samples", *toprocess);
+ for (;;) {
+ if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1)
+ goto writable_size_failed;
+
+ /* convert to samples, we can only deal with multiples of the
+ * sample size */
+ avail /= bps;
+
+ /* We always try to satisfy a request for data */
+ GST_LOG_OBJECT (psink, "writable samples %" G_GSIZE_FORMAT, avail);
+ if (avail > 0)
+ break;
+
+ /* we can't write a single byte, wait a bit */
+ GST_LOG_OBJECT (psink, "waiting for free space");
+ pa_threaded_mainloop_wait (psink->mainloop);
+
+ if (pbuf->paused)
+ goto was_paused;
+ }
+
+ if (avail > out_samples)
+ avail = out_samples;
+
+ GST_LOG_OBJECT (psink, "writing %d samples at offset %" G_GUINT64_FORMAT,
+ avail, *sample);
+
+ offset = *sample * bps;
+ towrite = avail * bps;
+
+ if (G_LIKELY (inr == outr && !reverse)) {
+ /* no rate conversion, simply write out the samples */
+ if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset,
+ PA_SEEK_ABSOLUTE) < 0)
+ goto write_failed;
+
+ data += towrite;
+ in_samples -= avail;
+ out_samples -= avail;
+ } else {
+ guint8 *dest, *d, *d_end;
+
+ /* we need to allocate a temporary buffer to resample the data into,
+ * FIXME, we should have a pulseaudio API to allocate this buffer for us
+ * from the shared memory. */
+ dest = d = g_malloc (towrite);
+ d_end = d + towrite;
+
+ if (!reverse) {
+ if (inr >= outr)
+ /* forward speed up */
+ FWD_UP_SAMPLES (data, data_end, d, d_end);
+ else
+ /* forward slow down */
+ FWD_DOWN_SAMPLES (data, data_end, d, d_end);
+ } else {
+ if (inr >= outr)
+ /* reverse speed up */
+ REV_UP_SAMPLES (data, data_end, d, d_end);
+ else
+ /* reverse slow down */
+ REV_DOWN_SAMPLES (data, data_end, d, d_end);
+ }
+ /* see what we have left to write */
+ towrite = (d - dest);
+ if (pa_stream_write (pbuf->stream, dest, towrite,
+ g_free, offset, PA_SEEK_ABSOLUTE) < 0)
+ goto write_failed;
+
+ avail = towrite / bps;
+ }
+ *sample += avail;
+ }
+ /* we consumed all samples here */
+ data = data_end + bps;
+
+ pbuf->in_commit = FALSE;
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+done:
+ result = inr - ((data_end - data) / bps);
+ GST_LOG_OBJECT (psink, "wrote %d samples", result);
+
+ return result;
+
+ /* ERRORS */
+unlock_and_fail:
+ {
+ pbuf->in_commit = FALSE;
+ GST_LOG_OBJECT (psink, "we are reset");
+ pa_threaded_mainloop_unlock (psink->mainloop);
+ goto done;
+ }
+no_start:
+ {
+ GST_LOG_OBJECT (psink, "we can not start");
+ return 0;
+ }
+start_failed:
+ {
+ GST_LOG_OBJECT (psink, "failed to start the ringbuffer");
+ return 0;
+ }
+was_paused:
+ {
+ pbuf->in_commit = FALSE;
+ GST_LOG_OBJECT (psink, "we are paused");
+ pa_threaded_mainloop_unlock (psink->mainloop);
+ goto done;
+ }
+writable_size_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_stream_writable_size() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock_and_fail;
+ }
+write_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_stream_write() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock_and_fail;
+ }
+}
+
+static void gst_pulsesink_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec);
+static void gst_pulsesink_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec);
+static void gst_pulsesink_finalize (GObject * object);
+
+static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event);
+
+static void gst_pulsesink_init_interfaces (GType type);
#if (G_BYTE_ORDER == G_LITTLE_ENDIAN)
# define ENDIANNESS "LITTLE_ENDIAN, BIG_ENDIAN"
@@ -106,14 +1135,14 @@ static gboolean gst_pulsesink_is_dead (GstPulseSink * pulsesink);
#endif
GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink);
-GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstAudioSink,
- GST_TYPE_AUDIO_SINK, gst_pulsesink_init_interfaces);
+GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink,
+ GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces);
static gboolean
gst_pulsesink_interface_supported (GstImplementsInterface *
iface, GType interface_type)
{
- GstPulseSink *this = GST_PULSESINK (iface);
+ GstPulseSink *this = GST_PULSESINK_CAST (iface);
if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe)
return TRUE;
@@ -150,7 +1179,6 @@ gst_pulsesink_init_interfaces (GType type)
static void
gst_pulsesink_base_init (gpointer g_class)
{
-
static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
@@ -195,30 +1223,33 @@ gst_pulsesink_base_init (gpointer g_class)
gst_static_pad_template_get (&pad_template));
}
+static GstRingBuffer *
+gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink)
+{
+ GstRingBuffer *buffer;
+
+ GST_DEBUG_OBJECT (sink, "creating ringbuffer");
+ buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL);
+ GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer);
+
+ return buffer;
+}
+
static void
gst_pulsesink_class_init (GstPulseSinkClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass);
- GstAudioSinkClass *gstaudiosink_class = GST_AUDIO_SINK_CLASS (klass);
+ GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass);
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property);
gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property);
- gstelement_class->change_state =
- GST_DEBUG_FUNCPTR (gst_pulsesink_change_state);
-
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event);
- gstaudiosink_class->open = GST_DEBUG_FUNCPTR (gst_pulsesink_open);
- gstaudiosink_class->close = GST_DEBUG_FUNCPTR (gst_pulsesink_close);
- gstaudiosink_class->prepare = GST_DEBUG_FUNCPTR (gst_pulsesink_prepare);
- gstaudiosink_class->unprepare = GST_DEBUG_FUNCPTR (gst_pulsesink_unprepare);
- gstaudiosink_class->write = GST_DEBUG_FUNCPTR (gst_pulsesink_write);
- gstaudiosink_class->delay = GST_DEBUG_FUNCPTR (gst_pulsesink_delay);
- gstaudiosink_class->reset = GST_DEBUG_FUNCPTR (gst_pulsesink_reset);
+ gstaudiosink_class->create_ringbuffer =
+ GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer);
/* Overwrite GObject fields */
g_object_class_install_property (gobject_class,
@@ -244,91 +1275,66 @@ gst_pulsesink_class_init (GstPulseSinkClass * klass)
#endif
}
-static void
-gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
+/* returns the current time of the sink ringbuffer */
+static GstClockTime
+gst_pulse_sink_get_time (GstClock * clock, GstBaseAudioSink * sink)
{
- int e;
-
- pulsesink->server = pulsesink->device = pulsesink->stream_name =
- pulsesink->device_description = NULL;
+ GstPulseSink *psink;
+ GstPulseRingBuffer *pbuf;
+ pa_usec_t time;
- pulsesink->context = NULL;
- pulsesink->stream = NULL;
+ if (sink->ringbuffer == NULL || sink->ringbuffer->spec.rate == 0)
+ return GST_CLOCK_TIME_NONE;
- pulsesink->volume = 1.0;
- pulsesink->volume_set = FALSE;
-
-#if HAVE_PULSE_0_9_13
- pa_sample_spec_init (&pulsesink->sample_spec);
-#else
- pulsesink->sample_spec.format = PA_SAMPLE_INVALID;
- pulsesink->sample_spec.rate = 0;
- pulsesink->sample_spec.channels = 0;
-#endif
+ pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
- pulsesink->operation_success = FALSE;
- pulsesink->did_reset = FALSE;
- pulsesink->in_write = FALSE;
- pulsesink->notify = 0;
+ pa_threaded_mainloop_lock (psink->mainloop);
+ /* if we don't have enough data to get a timestamp, just return 0 */
+ if (pa_stream_get_time (pbuf->stream, &time) < 0)
+ time = 0;
+ pa_threaded_mainloop_unlock (psink->mainloop);
- pulsesink->mainloop = pa_threaded_mainloop_new ();
- g_assert (pulsesink->mainloop);
+ time *= 1000;
- e = pa_threaded_mainloop_start (pulsesink->mainloop);
- g_assert (e == 0);
+ GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (time));
- pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */
+ return time;
}
static void
-gst_pulsesink_destroy_stream (GstPulseSink * pulsesink)
+gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
{
- if (pulsesink->stream) {
- pa_stream_disconnect (pulsesink->stream);
-
- /* Make sure we don't get any further callbacks */
- pa_stream_set_state_callback (pulsesink->stream, NULL, NULL);
- pa_stream_set_write_callback (pulsesink->stream, NULL, NULL);
- pa_stream_set_latency_update_callback (pulsesink->stream, NULL, NULL);
-
- pa_stream_unref (pulsesink->stream);
- pulsesink->stream = NULL;
- }
-
- g_free (pulsesink->stream_name);
- pulsesink->stream_name = NULL;
-
- g_free (pulsesink->device_description);
+ pulsesink->server = NULL;
+ pulsesink->device = NULL;
pulsesink->device_description = NULL;
-}
-static void
-gst_pulsesink_destroy_context (GstPulseSink * pulsesink)
-{
+ pulsesink->volume = 1.0;
+ pulsesink->volume_set = FALSE;
- gst_pulsesink_destroy_stream (pulsesink);
+ pulsesink->notify = 0;
- if (pulsesink->context) {
- pa_context_disconnect (pulsesink->context);
+ g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ()));
+ g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0);
- /* Make sure we don't get any further callbacks */
- pa_context_set_state_callback (pulsesink->context, NULL, NULL);
- pa_context_set_subscribe_callback (pulsesink->context, NULL, NULL);
+ pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */
- pa_context_unref (pulsesink->context);
- pulsesink->context = NULL;
- }
+ /* override with a custom clock */
+ if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock)
+ gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock);
+ GST_BASE_AUDIO_SINK (pulsesink)->provided_clock =
+ gst_audio_clock_new ("GstPulseSinkClock",
+ (GstAudioClockGetTimeFunc) gst_pulse_sink_get_time, pulsesink);
}
static void
gst_pulsesink_finalize (GObject * object)
{
- GstPulseSink *pulsesink = GST_PULSESINK (object);
+ GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
pa_threaded_mainloop_stop (pulsesink->mainloop);
- gst_pulsesink_destroy_context (pulsesink);
-
g_free (pulsesink->server);
g_free (pulsesink->device);
@@ -344,203 +1350,208 @@ gst_pulsesink_finalize (GObject * object)
#if HAVE_PULSE_0_9_12
static void
-gst_pulsesink_set_volume (GstPulseSink * pulsesink, gdouble volume)
+gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume)
{
pa_cvolume v;
pa_operation *o = NULL;
+ GstPulseRingBuffer *pbuf;
- pa_threaded_mainloop_lock (pulsesink->mainloop);
+ pa_threaded_mainloop_lock (psink->mainloop);
- pulsesink->volume = volume;
- pulsesink->volume_set = TRUE;
+ psink->volume = volume;
+ psink->volume_set = TRUE;
- if (!pulsesink->stream)
+ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
+ if (pbuf == NULL)
goto unlock;
- gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels, volume);
+ gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume);
- if (!(o = pa_context_set_sink_input_volume (pulsesink->context,
- pa_stream_get_index (pulsesink->stream), &v, NULL, NULL))) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_set_sink_input_volume() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock;
- }
+ if (!(o = pa_context_set_sink_input_volume (pbuf->context,
+ pa_stream_get_index (pbuf->stream), &v, NULL, NULL)))
+ goto volume_failed;
/* We don't really care about the result of this call */
-
unlock:
if (o)
pa_operation_unref (o);
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
+ pa_threaded_mainloop_unlock (psink->mainloop);
+
+ return;
+
+ /* ERRORS */
+volume_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_stream_set_sink_input_volume() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock;
+ }
}
static void
gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i,
int eol, void *userdata)
{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
+ GstPulseRingBuffer *pbuf;
+ GstPulseSink *psink;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
if (!i)
return;
- if (!pulsesink->stream)
+ if (!pbuf->stream)
return;
- g_assert (i->index == pa_stream_get_index (pulsesink->stream));
+ g_assert (i->index == pa_stream_get_index (pbuf->stream));
- pulsesink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
+ psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume));
}
static gdouble
-gst_pulsesink_get_volume (GstPulseSink * pulsesink)
+gst_pulsesink_get_volume (GstPulseSink * psink)
{
+ GstPulseRingBuffer *pbuf;
pa_operation *o = NULL;
gdouble v;
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- if (!pulsesink->stream)
- goto unlock;
+ pa_threaded_mainloop_lock (psink->mainloop);
- if (!(o = pa_context_get_sink_input_info (pulsesink->context,
- pa_stream_get_index (pulsesink->stream),
- gst_pulsesink_sink_input_info_cb, pulsesink))) {
+ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
+ if (pbuf == NULL || pbuf->stream == NULL)
+ goto no_buffer;
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_get_sink_input_info() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock;
- }
+ if (!(o = pa_context_get_sink_input_info (pbuf->context,
+ pa_stream_get_index (pbuf->stream),
+ gst_pulsesink_sink_input_info_cb, pbuf)))
+ goto info_failed;
while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
-
- if (gst_pulsesink_is_dead (pulsesink))
+ pa_threaded_mainloop_wait (psink->mainloop);
+ if (gst_pulsering_is_dead (psink, pbuf))
goto unlock;
-
- pa_threaded_mainloop_wait (pulsesink->mainloop);
}
unlock:
-
if (o)
pa_operation_unref (o);
- v = pulsesink->volume;
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
+ v = psink->volume;
+ pa_threaded_mainloop_unlock (psink->mainloop);
return v;
-}
-#endif
-
-static gboolean
-gst_pulsesink_is_dead (GstPulseSink * pulsesink)
-{
-
- if (!pulsesink->context
- || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pulsesink->context))
- || !pulsesink->stream
- || !PA_STREAM_IS_GOOD (pa_stream_get_state (pulsesink->stream))) {
- const gchar *err_str = pulsesink->context ?
- pa_strerror (pa_context_errno (pulsesink->context)) : NULL;
- GST_ELEMENT_ERROR ((pulsesink), RESOURCE, FAILED, ("Disconnected: %s",
- err_str), (NULL));
- return TRUE;
+ /* ERRORS */
+no_buffer:
+ {
+ GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
+ goto unlock;
+ }
+info_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_stream_get_sink_input_info() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock;
}
-
- return FALSE;
}
+#endif
static void
gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol,
void *userdata)
{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
+ GstPulseRingBuffer *pbuf;
+ GstPulseSink *psink;
+
+ pbuf = GST_PULSERING_BUFFER_CAST (userdata);
+ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf));
if (!i)
return;
- if (!pulsesink->stream)
+ if (!pbuf->stream)
return;
- g_assert (i->index == pa_stream_get_device_index (pulsesink->stream));
+ g_assert (i->index == pa_stream_get_device_index (pbuf->stream));
- g_free (pulsesink->device_description);
- pulsesink->device_description = g_strdup (i->description);
+ g_free (psink->device_description);
+ psink->device_description = g_strdup (i->description);
}
static gchar *
-gst_pulsesink_device_description (GstPulseSink * pulsesink)
+gst_pulsesink_device_description (GstPulseSink * psink)
{
+ GstPulseRingBuffer *pbuf;
pa_operation *o = NULL;
gchar *t;
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- if (!pulsesink->stream)
- goto unlock;
-
- if (!(o = pa_context_get_sink_info_by_index (pulsesink->context,
- pa_stream_get_device_index (pulsesink->stream),
- gst_pulsesink_sink_info_cb, pulsesink))) {
+ pa_threaded_mainloop_lock (psink->mainloop);
+ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
+ if (pbuf == NULL || pbuf->stream == NULL)
+ goto no_buffer;
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_get_sink_info() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock;
- }
+ if (!(o = pa_context_get_sink_info_by_index (pbuf->context,
+ pa_stream_get_device_index (pbuf->stream),
+ gst_pulsesink_sink_info_cb, pbuf)))
+ goto info_failed;
while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
-
- if (gst_pulsesink_is_dead (pulsesink))
+ pa_threaded_mainloop_wait (psink->mainloop);
+ if (gst_pulsering_is_dead (psink, pbuf))
goto unlock;
-
- pa_threaded_mainloop_wait (pulsesink->mainloop);
}
unlock:
-
if (o)
pa_operation_unref (o);
- t = g_strdup (pulsesink->device_description);
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
+ t = g_strdup (psink->device_description);
+ pa_threaded_mainloop_unlock (psink->mainloop);
return t;
+
+ /* ERRORS */
+no_buffer:
+ {
+ GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
+ goto unlock;
+ }
+info_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_stream_get_sink_info() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock;
+ }
}
static void
gst_pulsesink_set_property (GObject * object,
guint prop_id, const GValue * value, GParamSpec * pspec)
{
- GstPulseSink *pulsesink = GST_PULSESINK (object);
+ GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
switch (prop_id) {
case PROP_SERVER:
g_free (pulsesink->server);
pulsesink->server = g_value_dup_string (value);
-
if (pulsesink->probe)
gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server);
-
break;
-
case PROP_DEVICE:
g_free (pulsesink->device);
pulsesink->device = g_value_dup_string (value);
break;
-
#if HAVE_PULSE_0_9_12
case PROP_VOLUME:
gst_pulsesink_set_volume (pulsesink, g_value_get_double (value));
break;
#endif
-
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -552,30 +1563,26 @@ gst_pulsesink_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec)
{
- GstPulseSink *pulsesink = GST_PULSESINK (object);
+ GstPulseSink *pulsesink = GST_PULSESINK_CAST (object);
switch (prop_id) {
case PROP_SERVER:
g_value_set_string (value, pulsesink->server);
break;
-
case PROP_DEVICE:
g_value_set_string (value, pulsesink->device);
break;
-
case PROP_DEVICE_NAME:{
char *t = gst_pulsesink_device_description (pulsesink);
g_value_set_string (value, t);
g_free (t);
break;
}
-
#if HAVE_PULSE_0_9_12
case PROP_VOLUME:
g_value_set_double (value, gst_pulsesink_get_volume (pulsesink));
break;
#endif
-
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -583,517 +1590,58 @@ gst_pulsesink_get_property (GObject * object,
}
static void
-gst_pulsesink_context_state_cb (pa_context * c, void *userdata)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
-
- switch (pa_context_get_state (c)) {
- case PA_CONTEXT_READY:
- case PA_CONTEXT_TERMINATED:
- case PA_CONTEXT_FAILED:
- pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
- break;
-
- case PA_CONTEXT_UNCONNECTED:
- case PA_CONTEXT_CONNECTING:
- case PA_CONTEXT_AUTHORIZING:
- case PA_CONTEXT_SETTING_NAME:
- break;
- }
-}
-
-#if HAVE_PULSE_0_9_12
-static void
-gst_pulsesink_context_subscribe_cb (pa_context * c,
- pa_subscription_event_type_t t, uint32_t idx, void *userdata)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
-
- if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) &&
- t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW))
- return;
-
- if (!pulsesink->stream)
- return;
-
- if (idx != pa_stream_get_index (pulsesink->stream))
- return;
-
- /* Actually this event is also triggered when other properties of
- * the stream change that are unrelated to the volume. However it is
- * probably cheaper to signal the change here and check for the
- * volume when the GObject property is read instead of querying it always. */
-
- /* inform streaming thread to notify */
- g_atomic_int_compare_and_exchange (&pulsesink->notify, 0, 1);
-}
-#endif
-
-static void
-gst_pulsesink_stream_state_cb (pa_stream * s, void *userdata)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
-
- switch (pa_stream_get_state (s)) {
-
- case PA_STREAM_READY:
- case PA_STREAM_FAILED:
- case PA_STREAM_TERMINATED:
- pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
- break;
-
- case PA_STREAM_UNCONNECTED:
- case PA_STREAM_CREATING:
- break;
- }
-}
-
-static void
-gst_pulsesink_stream_request_cb (pa_stream * s, size_t length, void *userdata)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
-
- pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
-}
-
-static void
-gst_pulsesink_stream_latency_update_cb (pa_stream * s, void *userdata)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
-
- pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
-}
-
-static gboolean
-gst_pulsesink_open (GstAudioSink * asink)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (asink);
- gchar *name = gst_pulse_client_name ();
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- g_assert (!pulsesink->context);
- g_assert (!pulsesink->stream);
-
- if (!(pulsesink->context =
- pa_context_new (pa_threaded_mainloop_get_api (pulsesink->mainloop),
- name))) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("Failed to create context"), (NULL));
- goto unlock_and_fail;
- }
-
- pa_context_set_state_callback (pulsesink->context,
- gst_pulsesink_context_state_cb, pulsesink);
-#if HAVE_PULSE_0_9_12
- pa_context_set_subscribe_callback (pulsesink->context,
- gst_pulsesink_context_subscribe_cb, pulsesink);
-#endif
-
- if (pa_context_connect (pulsesink->context, pulsesink->server, 0, NULL) < 0) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
- for (;;) {
- pa_context_state_t state;
-
- state = pa_context_get_state (pulsesink->context);
-
- if (!PA_CONTEXT_IS_GOOD (state)) {
- GST_DEBUG_OBJECT (pulsesink, "Context state was not READY. Got: %d",
- state);
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
- if (state == PA_CONTEXT_READY)
- break;
-
- /* Wait until the context is ready */
- pa_threaded_mainloop_wait (pulsesink->mainloop);
- }
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
- g_free (name);
- return TRUE;
-
-unlock_and_fail:
-
- gst_pulsesink_destroy_context (pulsesink);
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
- g_free (name);
- return FALSE;
-}
-
-static gboolean
-gst_pulsesink_close (GstAudioSink * asink)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (asink);
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
- gst_pulsesink_destroy_context (pulsesink);
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
-
- return TRUE;
-}
-
-static gboolean
-gst_pulsesink_prepare (GstAudioSink * asink, GstRingBufferSpec * spec)
-{
- pa_buffer_attr buf_attr;
- pa_channel_map channel_map;
- GstPulseSink *pulsesink = GST_PULSESINK (asink);
- pa_operation *o = NULL;
- pa_cvolume v;
-
- if (!gst_pulse_fill_sample_spec (spec, &pulsesink->sample_spec)) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, SETTINGS,
- ("Invalid sample specification."), (NULL));
- return FALSE;
- }
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- if (!pulsesink->context) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Bad context"), (NULL));
- goto unlock_and_fail;
- }
-
- g_assert (!pulsesink->stream);
-
- if (!(o =
- pa_context_subscribe (pulsesink->context,
- PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) {
- const gchar *err_str = pulsesink->context ?
- pa_strerror (pa_context_errno (pulsesink->context)) : NULL;
-
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_context_subscribe() failed: %s", err_str), (NULL));
- goto unlock_and_fail;
- }
-
- pa_operation_unref (o);
-
- if (!(pulsesink->stream = pa_stream_new (pulsesink->context,
- pulsesink->stream_name ?
- pulsesink->stream_name : "Playback Stream",
- &pulsesink->sample_spec,
- gst_pulse_gst_to_channel_map (&channel_map, spec)))) {
- const gchar *err_str = pulsesink->context ?
- pa_strerror (pa_context_errno (pulsesink->context)) : NULL;
-
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("Failed to create stream: %s", err_str), (NULL));
- goto unlock_and_fail;
- }
-
- pa_stream_set_state_callback (pulsesink->stream,
- gst_pulsesink_stream_state_cb, pulsesink);
- pa_stream_set_write_callback (pulsesink->stream,
- gst_pulsesink_stream_request_cb, pulsesink);
- pa_stream_set_latency_update_callback (pulsesink->stream,
- gst_pulsesink_stream_latency_update_cb, pulsesink);
-
- memset (&buf_attr, 0, sizeof (buf_attr));
- buf_attr.tlength = spec->segtotal * spec->segsize;
- buf_attr.maxlength = buf_attr.tlength * 2;
- buf_attr.prebuf = buf_attr.tlength;
- buf_attr.minreq = spec->segsize;
-
- if (pulsesink->volume_set)
- gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels,
- pulsesink->volume);
-
- if (pa_stream_connect_playback (pulsesink->stream, pulsesink->device,
- &buf_attr,
- PA_STREAM_INTERPOLATE_TIMING |
- PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_NOT_MONOTONOUS |
-#if HAVE_PULSE_0_9_11
- PA_STREAM_ADJUST_LATENCY |
-#endif
- PA_STREAM_START_CORKED, pulsesink->volume_set ? &v : NULL, NULL) < 0) {
-
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("Failed to connect stream: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
- pulsesink->corked = TRUE;
-
- for (;;) {
- pa_stream_state_t state;
-
- state = pa_stream_get_state (pulsesink->stream);
-
- if (!PA_STREAM_IS_GOOD (state)) {
- GST_DEBUG_OBJECT (pulsesink, "Stream state was not READY. Got: %d",
- state);
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("Failed to connect stream: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
- if (state == PA_STREAM_READY)
- break;
-
- /* Wait until the stream is ready */
- pa_threaded_mainloop_wait (pulsesink->mainloop);
- }
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
- return TRUE;
-
-unlock_and_fail:
-
- gst_pulsesink_destroy_stream (pulsesink);
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
-
- return FALSE;
-}
-
-static gboolean
-gst_pulsesink_unprepare (GstAudioSink * asink)
+gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t)
{
- GstPulseSink *pulsesink = GST_PULSESINK (asink);
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
- gst_pulsesink_destroy_stream (pulsesink);
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
-
- return TRUE;
-}
-
-static guint
-gst_pulsesink_write (GstAudioSink * asink, gpointer data, guint length)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (asink);
pa_operation *o = NULL;
- size_t sum = 0;
+ GstPulseRingBuffer *pbuf;
- /* FIXME post message rather than using a signal (as mixer interface) */
- if (g_atomic_int_compare_and_exchange (&pulsesink->notify, 1, 0))
- g_object_notify (G_OBJECT (pulsesink), "volume");
+ pa_threaded_mainloop_lock (psink->mainloop);
- pa_threaded_mainloop_lock (pulsesink->mainloop);
+ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
+ if (pbuf == NULL)
+ goto no_buffer;
- pulsesink->in_write = TRUE;
+ g_free (pbuf->stream_name);
+ pbuf->stream_name = g_strdup (t);
- /* Make sure the stream is uncorked - it might not be on a caps change */
- if (pulsesink->corked) {
- if (!(o = pa_stream_cork (pulsesink->stream, FALSE, NULL, NULL))) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_cork() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
+ if (gst_pulsering_is_dead (psink, pbuf))
+ goto server_dead;
- while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock_and_fail;
- pa_threaded_mainloop_wait (pulsesink->mainloop);
- }
- pulsesink->corked = FALSE;
-
- pa_operation_unref (o);
- o = NULL;
- }
-
- while (length > 0) {
- size_t l;
-
- for (;;) {
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock_and_fail;
-
- if ((l = pa_stream_writable_size (pulsesink->stream)) == (size_t) - 1) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_writable_size() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
- if (l >= length)
- break;
-
- if (pulsesink->did_reset)
- goto unlock_and_fail;
-
- pa_threaded_mainloop_wait (pulsesink->mainloop);
- }
-
- if (l > length)
- l = length;
-
- if (pa_stream_write (pulsesink->stream, data, l, NULL, 0,
- PA_SEEK_RELATIVE) < 0) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_write() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
- data = (guint8 *) data + l;
- length -= l;
-
- sum += l;
- }
-
- pulsesink->did_reset = FALSE;
- pulsesink->in_write = FALSE;
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
- return sum;
-
-unlock_and_fail:
-
- pulsesink->did_reset = FALSE;
- pulsesink->in_write = FALSE;
+ if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL)))
+ goto name_failed;
+ /* We're not interested if this operation failed or not */
+unlock:
if (o)
pa_operation_unref (o);
+ pa_threaded_mainloop_unlock (psink->mainloop);
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
- return (guint) - 1;
-}
+ return;
-static guint
-gst_pulsesink_delay (GstAudioSink * asink)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (asink);
- pa_usec_t t;
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- for (;;) {
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock_and_fail;
-
- if (pa_stream_get_latency (pulsesink->stream, &t, NULL) >= 0)
- break;
-
- if (pa_context_errno (pulsesink->context) != PA_ERR_NODATA) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_get_latency() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
- pa_threaded_mainloop_wait (pulsesink->mainloop);
- }
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
-
- return gst_util_uint64_scale_int (t, pulsesink->sample_spec.rate, 1000000LL);
-
-unlock_and_fail:
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
- return 0;
-}
-
-static void
-gst_pulsesink_success_cb (pa_stream * s, int success, void *userdata)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (userdata);
-
- pulsesink->operation_success = !!success;
- pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
-}
-
-static void
-gst_pulsesink_reset (GstAudioSink * asink)
-{
- GstPulseSink *pulsesink = GST_PULSESINK (asink);
- pa_operation *o = NULL;
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock_and_fail;
-
- if (!(o =
- pa_stream_flush (pulsesink->stream, gst_pulsesink_success_cb,
- pulsesink))) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_flush() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
- /* Inform anyone waiting in _write() call that it shall wakeup */
- if (pulsesink->in_write) {
- pulsesink->did_reset = TRUE;
- pa_threaded_mainloop_signal (pulsesink->mainloop, 0);
- }
-
- pulsesink->operation_success = FALSE;
- while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
-
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock_and_fail;
-
- pa_threaded_mainloop_wait (pulsesink->mainloop);
- }
-
- if (!pulsesink->operation_success) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Flush failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock_and_fail;
- }
-
-unlock_and_fail:
-
- if (o) {
- pa_operation_cancel (o);
- pa_operation_unref (o);
+ /* ERRORS */
+no_buffer:
+ {
+ GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
+ goto unlock;
}
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
-}
-
-static void
-gst_pulsesink_change_title (GstPulseSink * pulsesink, const gchar * t)
-{
- pa_operation *o = NULL;
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- g_free (pulsesink->stream_name);
- pulsesink->stream_name = g_strdup (t);
-
- if (gst_pulsesink_is_dead (pulsesink))
+server_dead:
+ {
+ GST_DEBUG_OBJECT (psink, "the server is dead");
goto unlock;
-
- if (!(o =
- pa_stream_set_name (pulsesink->stream, pulsesink->stream_name, NULL,
- NULL))) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
+ }
+name_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
("pa_stream_set_name() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
goto unlock;
}
-
- /* We're not interested if this operation failed or not */
-
-unlock:
-
- if (o)
- pa_operation_unref (o);
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
}
#if HAVE_PULSE_0_9_11
static void
-gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l)
+gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l)
{
-
static const gchar *const map[] = {
GST_TAG_TITLE, PA_PROP_MEDIA_TITLE,
GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST,
@@ -1102,11 +1650,11 @@ gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l)
/* We might add more here later on ... */
NULL
};
-
pa_proplist *pl = NULL;
const gchar *const *t;
gboolean empty = TRUE;
pa_operation *o = NULL;
+ GstPulseRingBuffer *pbuf;
pl = pa_proplist_new ();
@@ -1123,44 +1671,61 @@ gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l)
g_free (n);
}
}
-
if (empty)
goto finish;
- pa_threaded_mainloop_lock (pulsesink->mainloop);
+ pa_threaded_mainloop_lock (psink->mainloop);
+ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer);
+ if (pbuf == NULL)
+ goto no_buffer;
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock;
+ if (gst_pulsering_is_dead (psink, pbuf))
+ goto server_dead;
- if (!(o =
- pa_stream_proplist_update (pulsesink->stream, PA_UPDATE_REPLACE, pl,
- NULL, NULL))) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_proplist_update() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock;
- }
+ if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE,
+ pl, NULL, NULL)))
+ goto update_failed;
/* We're not interested if this operation failed or not */
-
unlock:
if (o)
pa_operation_unref (o);
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
+ pa_threaded_mainloop_unlock (psink->mainloop);
finish:
if (pl)
pa_proplist_free (pl);
+
+ return;
+
+ /* ERRORS */
+no_buffer:
+ {
+ GST_DEBUG_OBJECT (psink, "we have no ringbuffer");
+ goto unlock;
+ }
+server_dead:
+ {
+ GST_DEBUG_OBJECT (psink, "the server is dead");
+ goto unlock;
+ }
+update_failed:
+ {
+ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED,
+ ("pa_stream_proplist_update() failed: %s",
+ pa_strerror (pa_context_errno (pbuf->context))), (NULL));
+ goto unlock;
+ }
}
#endif
static gboolean
gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
{
- GstPulseSink *pulsesink = GST_PULSESINK (sink);
+ GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_TAG:{
@@ -1207,61 +1772,3 @@ gst_pulsesink_event (GstBaseSink * sink, GstEvent * event)
return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
}
-
-static void
-gst_pulsesink_pause (GstPulseSink * pulsesink, gboolean b)
-{
- pa_operation *o = NULL;
-
- pa_threaded_mainloop_lock (pulsesink->mainloop);
-
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock;
-
- if (!(o = pa_stream_cork (pulsesink->stream, b, NULL, NULL))) {
- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED,
- ("pa_stream_cork() failed: %s",
- pa_strerror (pa_context_errno (pulsesink->context))), (NULL));
- goto unlock;
- }
-
- while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) {
- if (gst_pulsesink_is_dead (pulsesink))
- goto unlock;
- pa_threaded_mainloop_wait (pulsesink->mainloop);
- }
- pulsesink->corked = b;
-
-unlock:
- if (o)
- pa_operation_unref (o);
-
- pa_threaded_mainloop_unlock (pulsesink->mainloop);
-}
-
-
-static GstStateChangeReturn
-gst_pulsesink_change_state (GstElement * element, GstStateChange transition)
-{
- GstStateChangeReturn res;
- GstPulseSink *this = GST_PULSESINK (element);
-
- switch (transition) {
- case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
- gst_pulsesink_pause (this, FALSE);
- break;
- default:
- break;
- }
-
- res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-
- switch (transition) {
- case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
- gst_pulsesink_pause (this, TRUE);
- break;
- default:
- break;
- }
- return res;
-}
diff --git a/ext/pulse/pulsesink.h b/ext/pulse/pulsesink.h
index 9ec626cd..2f74ac3a 100644
--- a/ext/pulse/pulsesink.h
+++ b/ext/pulse/pulsesink.h
@@ -42,39 +42,31 @@ G_BEGIN_DECLS
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSESINK))
#define GST_IS_PULSESINK_CLASS(obj) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSESINK))
+#define GST_PULSESINK_CAST(obj) \
+ ((GstPulseSink *)(obj))
typedef struct _GstPulseSink GstPulseSink;
typedef struct _GstPulseSinkClass GstPulseSinkClass;
struct _GstPulseSink
{
- GstAudioSink sink;
+ GstBaseAudioSink sink;
gchar *server, *device, *stream_name;
+ gchar *device_description;
pa_threaded_mainloop *mainloop;
- pa_context *context;
- pa_stream *stream;
-
- pa_sample_spec sample_spec;
-
GstPulseProbe *probe;
gdouble volume;
gboolean volume_set;
-
- gchar *device_description;
-
- gboolean operation_success;
- gboolean did_reset, in_write;
- gboolean corked;
gint notify;
};
struct _GstPulseSinkClass
{
- GstAudioSinkClass parent_class;
+ GstBaseAudioSinkClass parent_class;
};
GType gst_pulsesink_get_type (void);