From 6bc6cafcc6086eb3072f4f1b4936393d9280fbb9 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 8 Apr 2009 13:52:41 +0200 Subject: pulsesink: rewrite pulsesink Derive from BaseAudioSink and implement our custom ringbuffer that maps to the internal pulseaudio ringbuffer. --- ext/pulse/pulsesink.c | 2109 ++++++++++++++++++++++++++++++------------------- ext/pulse/pulsesink.h | 18 +- 2 files changed, 1313 insertions(+), 814 deletions(-) diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c index 5064f6f4..6fcd3378 100644 --- a/ext/pulse/pulsesink.c +++ b/ext/pulse/pulsesink.c @@ -1,7 +1,7 @@ -/* - * GStreamer pulseaudio plugin +/* GStreamer pulseaudio plugin * * Copyright (c) 2004-2008 Lennart Poettering + * (c) 2009 Wim Taymans * * gst-pulse is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as @@ -66,1034 +66,1582 @@ enum PROP_VOLUME }; -static void gst_pulsesink_destroy_stream (GstPulseSink * pulsesink); - -static void gst_pulsesink_destroy_context (GstPulseSink * pulsesink); +#define GST_TYPE_PULSERING_BUFFER \ + (gst_pulseringbuffer_get_type()) +#define GST_PULSERING_BUFFER(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer)) +#define GST_PULSERING_BUFFER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass)) +#define GST_PULSERING_BUFFER_GET_CLASS(obj) \ + (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass)) +#define GST_PULSERING_BUFFER_CAST(obj) \ + ((GstPulseRingBuffer *)obj) +#define GST_IS_PULSERING_BUFFER(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER)) +#define GST_IS_PULSERING_BUFFER_CLASS(klass)\ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER)) + +typedef struct _GstPulseRingBuffer GstPulseRingBuffer; +typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass; + +/* We keep a custom ringbuffer that is backed up by data allocated by + * pulseaudio. We must also overide the commit function to write into + * pulseaudio memory instead. */ +struct _GstPulseRingBuffer +{ + GstRingBuffer object; -static void gst_pulsesink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_pulsesink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); -static void gst_pulsesink_finalize (GObject * object); + gchar *stream_name; -static gboolean gst_pulsesink_open (GstAudioSink * asink); + pa_context *context; + pa_stream *stream; -static gboolean gst_pulsesink_close (GstAudioSink * asink); + pa_sample_spec sample_spec; -static gboolean gst_pulsesink_prepare (GstAudioSink * asink, - GstRingBufferSpec * spec); -static gboolean gst_pulsesink_unprepare (GstAudioSink * asink); + gboolean corked; + gboolean in_commit; + gboolean paused; + guint required; +}; -static guint gst_pulsesink_write (GstAudioSink * asink, gpointer data, - guint length); -static guint gst_pulsesink_delay (GstAudioSink * asink); +struct _GstPulseRingBufferClass +{ + GstRingBufferClass parent_class; +}; -static void gst_pulsesink_reset (GstAudioSink * asink); +static void gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass); +static void gst_pulseringbuffer_init (GstPulseRingBuffer * ringbuffer, + GstPulseRingBufferClass * klass); +static void gst_pulseringbuffer_finalize (GObject * object); -static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event); +static GstRingBufferClass *ring_parent_class = NULL; -static GstStateChangeReturn gst_pulsesink_change_state (GstElement * - element, GstStateChange transition); +static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf, + GstRingBufferSpec * spec); +static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf); +static gboolean gst_pulseringbuffer_activate (GstRingBuffer * buf, + gboolean active); +static guint gst_pulseringbuffer_commit (GstRingBuffer * buf, + guint64 * sample, guchar * data, gint in_samples, gint out_samples, + gint * accum); + +/* ringbuffer abstract base class */ +static GType +gst_pulseringbuffer_get_type (void) +{ + static GType ringbuffer_type = 0; + + if (!ringbuffer_type) { + static const GTypeInfo ringbuffer_info = { + sizeof (GstPulseRingBufferClass), + NULL, + NULL, + (GClassInitFunc) gst_pulseringbuffer_class_init, + NULL, + NULL, + sizeof (GstPulseRingBuffer), + 0, + (GInstanceInitFunc) gst_pulseringbuffer_init, + NULL + }; + + ringbuffer_type = + g_type_register_static (GST_TYPE_RING_BUFFER, "GstPulseSinkRingBuffer", + &ringbuffer_info, 0); + } + return ringbuffer_type; +} -static void gst_pulsesink_init_interfaces (GType type); +static void +gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass) +{ + GObjectClass *gobject_class; + GstObjectClass *gstobject_class; + GstRingBufferClass *gstringbuffer_class; + + gobject_class = (GObjectClass *) klass; + gstobject_class = (GstObjectClass *) klass; + gstringbuffer_class = (GstRingBufferClass *) klass; + + ring_parent_class = g_type_class_peek_parent (klass); + + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_finalize); + + gstringbuffer_class->open_device = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device); + gstringbuffer_class->close_device = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device); + gstringbuffer_class->acquire = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire); + gstringbuffer_class->release = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release); + gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start); + gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause); + gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start); + gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop); + + gstringbuffer_class->activate = + GST_DEBUG_FUNCPTR (gst_pulseringbuffer_activate); + gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit); +} -static gboolean gst_pulsesink_is_dead (GstPulseSink * pulsesink); +static void +gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf, + GstPulseRingBufferClass * g_class) +{ + pbuf->stream_name = NULL; + pbuf->context = NULL; + pbuf->stream = NULL; -#if (G_BYTE_ORDER == G_LITTLE_ENDIAN) -# define ENDIANNESS "LITTLE_ENDIAN, BIG_ENDIAN" +#if HAVE_PULSE_0_9_13 + pa_sample_spec_init (&pbuf->sample_spec); #else -# define ENDIANNESS "BIG_ENDIAN, LITTLE_ENDIAN" + pbuf->sample_spec.format = PA_SAMPLE_INVALID; + pbuf->sample_spec.rate = 0; + pbuf->sample_spec.channels = 0; #endif -GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink); -GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstAudioSink, - GST_TYPE_AUDIO_SINK, gst_pulsesink_init_interfaces); + pbuf->corked = TRUE; +} -static gboolean -gst_pulsesink_interface_supported (GstImplementsInterface * - iface, GType interface_type) +static void +gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf) { - GstPulseSink *this = GST_PULSESINK (iface); + if (pbuf->stream) { + pa_stream_disconnect (pbuf->stream); - if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe) - return TRUE; + /* Make sure we don't get any further callbacks */ + pa_stream_set_state_callback (pbuf->stream, NULL, NULL); + pa_stream_set_write_callback (pbuf->stream, NULL, NULL); + pa_stream_set_latency_update_callback (pbuf->stream, NULL, NULL); - return FALSE; + pa_stream_unref (pbuf->stream); + pbuf->stream = NULL; + } + + g_free (pbuf->stream_name); + pbuf->stream_name = NULL; } static void -gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass) +gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf) { - klass->supported = gst_pulsesink_interface_supported; + gst_pulsering_destroy_stream (pbuf); + + if (pbuf->context) { + pa_context_disconnect (pbuf->context); + + /* Make sure we don't get any further callbacks */ + pa_context_set_state_callback (pbuf->context, NULL, NULL); + pa_context_set_subscribe_callback (pbuf->context, NULL, NULL); + + pa_context_unref (pbuf->context); + pbuf->context = NULL; + } } static void -gst_pulsesink_init_interfaces (GType type) +gst_pulseringbuffer_finalize (GObject * object) { - static const GInterfaceInfo implements_iface_info = { - (GInterfaceInitFunc) gst_pulsesink_implements_interface_init, - NULL, - NULL, - }; - static const GInterfaceInfo probe_iface_info = { - (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init, - NULL, - NULL, - }; + GstPulseRingBuffer *ringbuffer; - g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, - &implements_iface_info); - g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE, - &probe_iface_info); + ringbuffer = GST_PULSERING_BUFFER_CAST (object); + + gst_pulsering_destroy_context (ringbuffer); + + G_OBJECT_CLASS (ring_parent_class)->finalize (object); +} + +static gboolean +gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf) +{ + if (!pbuf->context + || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context)) + || !pbuf->stream + || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) { + const gchar *err_str = pbuf->context ? + pa_strerror (pa_context_errno (pbuf->context)) : NULL; + + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s", + err_str), (NULL)); + return TRUE; + } + return FALSE; } static void -gst_pulsesink_base_init (gpointer g_class) +gst_pulsering_context_state_cb (pa_context * c, void *userdata) { + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_context_state_t state; - static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS ("audio/x-raw-int, " - "endianness = (int) { " ENDIANNESS " }, " - "signed = (boolean) TRUE, " - "width = (int) 16, " - "depth = (int) 16, " - "rate = (int) [ 1, MAX ], " - "channels = (int) [ 1, 32 ];" - "audio/x-raw-float, " - "endianness = (int) { " ENDIANNESS " }, " - "width = (int) 32, " - "rate = (int) [ 1, MAX ], " - "channels = (int) [ 1, 32 ];" - "audio/x-raw-int, " - "endianness = (int) { " ENDIANNESS " }, " - "signed = (boolean) TRUE, " - "width = (int) 32, " - "depth = (int) 32, " - "rate = (int) [ 1, MAX ], " - "channels = (int) [ 1, 32 ];" - "audio/x-raw-int, " - "signed = (boolean) FALSE, " - "width = (int) 8, " - "depth = (int) 8, " - "rate = (int) [ 1, MAX ], " - "channels = (int) [ 1, 32 ];" - "audio/x-alaw, " - "rate = (int) [ 1, MAX], " - "channels = (int) [ 1, 32 ];" - "audio/x-mulaw, " - "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]") - ); + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + state = pa_context_get_state (c); + GST_LOG_OBJECT (psink, "got new context state %d", state); - gst_element_class_set_details_simple (element_class, - "PulseAudio Audio Sink", - "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering"); - gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&pad_template)); + switch (state) { + case PA_CONTEXT_READY: + case PA_CONTEXT_TERMINATED: + case PA_CONTEXT_FAILED: + GST_LOG_OBJECT (psink, "signaling"); + pa_threaded_mainloop_signal (psink->mainloop, 0); + break; + + case PA_CONTEXT_UNCONNECTED: + case PA_CONTEXT_CONNECTING: + case PA_CONTEXT_AUTHORIZING: + case PA_CONTEXT_SETTING_NAME: + break; + } } +#if HAVE_PULSE_0_9_12 static void -gst_pulsesink_class_init (GstPulseSinkClass * klass) +gst_pulsering_context_subscribe_cb (pa_context * c, + pa_subscription_event_type_t t, uint32_t idx, void *userdata) { - GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); - GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); - GstAudioSinkClass *gstaudiosink_class = GST_AUDIO_SINK_CLASS (klass); + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; - gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize); - gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property); + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - gstelement_class->change_state = - GST_DEBUG_FUNCPTR (gst_pulsesink_change_state); + if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) && + t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW)) + return; - gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event); + if (!psink->stream) + return; - gstaudiosink_class->open = GST_DEBUG_FUNCPTR (gst_pulsesink_open); - gstaudiosink_class->close = GST_DEBUG_FUNCPTR (gst_pulsesink_close); - gstaudiosink_class->prepare = GST_DEBUG_FUNCPTR (gst_pulsesink_prepare); - gstaudiosink_class->unprepare = GST_DEBUG_FUNCPTR (gst_pulsesink_unprepare); - gstaudiosink_class->write = GST_DEBUG_FUNCPTR (gst_pulsesink_write); - gstaudiosink_class->delay = GST_DEBUG_FUNCPTR (gst_pulsesink_delay); - gstaudiosink_class->reset = GST_DEBUG_FUNCPTR (gst_pulsesink_reset); + if (idx != pa_stream_get_index (pbuf->stream)) + return; - /* Overwrite GObject fields */ - g_object_class_install_property (gobject_class, - PROP_SERVER, - g_param_spec_string ("server", "Server", - "The PulseAudio server to connect to", NULL, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_DEVICE, - g_param_spec_string ("device", "Sink", - "The PulseAudio sink device to connect to", NULL, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, - PROP_DEVICE_NAME, - g_param_spec_string ("device-name", "Device name", - "Human-readable name of the sound device", NULL, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); -#if HAVE_PULSE_0_9_12 - g_object_class_install_property (gobject_class, - PROP_VOLUME, - g_param_spec_double ("volume", "Volume", - "Volume of this stream", 0.0, 1000.0, 1.0, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); -#endif + /* Actually this event is also triggered when other properties of + * the stream change that are unrelated to the volume. However it is + * probably cheaper to signal the change here and check for the + * volume when the GObject property is read instead of querying it always. */ + + /* inform streaming thread to notify */ + g_atomic_int_compare_and_exchange (&psink->notify, 0, 1); } +#endif -static void -gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) +/* will be called when the device should be opened. In this case we will connect + * to the server. We should not try to open any streams in this state. */ +static gboolean +gst_pulseringbuffer_open_device (GstRingBuffer * buf) { - int e; + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gchar *name; + pa_mainloop_api *api; - pulsesink->server = pulsesink->device = pulsesink->stream_name = - pulsesink->device_description = NULL; + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + pbuf = GST_PULSERING_BUFFER_CAST (buf); - pulsesink->context = NULL; - pulsesink->stream = NULL; + g_assert (!pbuf->context); + g_assert (!pbuf->stream); - pulsesink->volume = 1.0; - pulsesink->volume_set = FALSE; + name = gst_pulse_client_name (); -#if HAVE_PULSE_0_9_13 - pa_sample_spec_init (&pulsesink->sample_spec); -#else - pulsesink->sample_spec.format = PA_SAMPLE_INVALID; - pulsesink->sample_spec.rate = 0; - pulsesink->sample_spec.channels = 0; -#endif + pa_threaded_mainloop_lock (psink->mainloop); - pulsesink->operation_success = FALSE; - pulsesink->did_reset = FALSE; - pulsesink->in_write = FALSE; - pulsesink->notify = 0; + /* get the mainloop api and create a context */ + GST_LOG_OBJECT (psink, "new context with name %s", GST_STR_NULL (name)); + api = pa_threaded_mainloop_get_api (psink->mainloop); + if (!(pbuf->context = pa_context_new (api, name))) + goto create_failed; - pulsesink->mainloop = pa_threaded_mainloop_new (); - g_assert (pulsesink->mainloop); + /* register some essential callbacks */ + pa_context_set_state_callback (pbuf->context, + gst_pulsering_context_state_cb, pbuf); +#if HAVE_PULSE_0_9_12 + pa_context_set_subscribe_callback (psink->context, + gst_pulsering_context_subscribe_cb, pbuf); +#endif - e = pa_threaded_mainloop_start (pulsesink->mainloop); - g_assert (e == 0); + /* try to connect to the server and wait for completioni, we don't want to + * autospawn a deamon */ + GST_LOG_OBJECT (psink, "connect to server %s", GST_STR_NULL (psink->server)); + if (pa_context_connect (pbuf->context, psink->server, PA_CONTEXT_NOAUTOSPAWN, + NULL) < 0) + goto connect_failed; - pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */ -} + for (;;) { + pa_context_state_t state; -static void -gst_pulsesink_destroy_stream (GstPulseSink * pulsesink) -{ - if (pulsesink->stream) { - pa_stream_disconnect (pulsesink->stream); + state = pa_context_get_state (pbuf->context); - /* Make sure we don't get any further callbacks */ - pa_stream_set_state_callback (pulsesink->stream, NULL, NULL); - pa_stream_set_write_callback (pulsesink->stream, NULL, NULL); - pa_stream_set_latency_update_callback (pulsesink->stream, NULL, NULL); + GST_LOG_OBJECT (psink, "context state is now %d", state); - pa_stream_unref (pulsesink->stream); - pulsesink->stream = NULL; - } + if (!PA_CONTEXT_IS_GOOD (state)) + goto connect_failed; - g_free (pulsesink->stream_name); - pulsesink->stream_name = NULL; + if (state == PA_CONTEXT_READY) + break; - g_free (pulsesink->device_description); - pulsesink->device_description = NULL; -} + /* Wait until the context is ready */ + GST_LOG_OBJECT (psink, "waiting.."); + pa_threaded_mainloop_wait (psink->mainloop); + } -static void -gst_pulsesink_destroy_context (GstPulseSink * pulsesink) -{ + GST_LOG_OBJECT (psink, "opened the device"); - gst_pulsesink_destroy_stream (pulsesink); + pa_threaded_mainloop_unlock (psink->mainloop); + g_free (name); - if (pulsesink->context) { - pa_context_disconnect (pulsesink->context); + return TRUE; - /* Make sure we don't get any further callbacks */ - pa_context_set_state_callback (pulsesink->context, NULL, NULL); - pa_context_set_subscribe_callback (pulsesink->context, NULL, NULL); + /* ERRORS */ +unlock_and_fail: + { + gst_pulsering_destroy_context (pbuf); - pa_context_unref (pulsesink->context); - pulsesink->context = NULL; + pa_threaded_mainloop_unlock (psink->mainloop); + g_free (name); + return FALSE; + } +create_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("Failed to create context"), (NULL)); + goto unlock_and_fail; + } +connect_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; } } -static void -gst_pulsesink_finalize (GObject * object) +/* close the device */ +static gboolean +gst_pulseringbuffer_close_device (GstRingBuffer * buf) { - GstPulseSink *pulsesink = GST_PULSESINK (object); - - pa_threaded_mainloop_stop (pulsesink->mainloop); + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; - gst_pulsesink_destroy_context (pulsesink); + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); - g_free (pulsesink->server); - g_free (pulsesink->device); + GST_LOG_OBJECT (psink, "closing device"); - pa_threaded_mainloop_free (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); + gst_pulsering_destroy_context (pbuf); + pa_threaded_mainloop_unlock (psink->mainloop); - if (pulsesink->probe) { - gst_pulseprobe_free (pulsesink->probe); - pulsesink->probe = NULL; - } + GST_LOG_OBJECT (psink, "closed device"); - G_OBJECT_CLASS (parent_class)->finalize (object); + return TRUE; } -#if HAVE_PULSE_0_9_12 static void -gst_pulsesink_set_volume (GstPulseSink * pulsesink, gdouble volume) +gst_pulsering_stream_state_cb (pa_stream * s, void *userdata) { - pa_cvolume v; - pa_operation *o = NULL; - - pa_threaded_mainloop_lock (pulsesink->mainloop); + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_stream_state_t state; - pulsesink->volume = volume; - pulsesink->volume_set = TRUE; + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - if (!pulsesink->stream) - goto unlock; - - gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels, volume); + state = pa_stream_get_state (s); + GST_LOG_OBJECT (psink, "got new stream state %d", state); - if (!(o = pa_context_set_sink_input_volume (pulsesink->context, - pa_stream_get_index (pulsesink->stream), &v, NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_set_sink_input_volume() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; + switch (state) { + case PA_STREAM_READY: + case PA_STREAM_FAILED: + case PA_STREAM_TERMINATED: + GST_LOG_OBJECT (psink, "signaling"); + pa_threaded_mainloop_signal (psink->mainloop, 0); + break; + case PA_STREAM_UNCONNECTED: + case PA_STREAM_CREATING: + break; } +} - /* We don't really care about the result of this call */ +static void +gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; -unlock: + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - if (o) - pa_operation_unref (o); + GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + if (pbuf->in_commit) { + pa_threaded_mainloop_signal (psink->mainloop, 0); + } } static void -gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i, - int eol, void *userdata) +gst_pulsering_stream_latency_update_cb (pa_stream * s, void *userdata) { - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - if (!i) - return; + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; - if (!pulsesink->stream) - return; + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - g_assert (i->index == pa_stream_get_index (pulsesink->stream)); + GST_LOG_OBJECT (psink, "got latency update callback"); - pulsesink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume)); + pa_threaded_mainloop_signal (psink->mainloop, 0); } -static gdouble -gst_pulsesink_get_volume (GstPulseSink * pulsesink) +/* This method should create a new stream of the given @spec. No playback should + * start yet so we start in the corked state. */ +static gboolean +gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) { + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_buffer_attr buf_attr; + const pa_buffer_attr *buf_attr_ptr; + pa_channel_map channel_map; pa_operation *o = NULL; - gdouble v; + pa_cvolume v, *pv; + pa_stream_flags_t flags; + const gchar *name; - pa_threaded_mainloop_lock (pulsesink->mainloop); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + pbuf = GST_PULSERING_BUFFER_CAST (buf); - if (!pulsesink->stream) - goto unlock; + GST_LOG_OBJECT (psink, "creating sample spec"); + /* convert the gstreamer sample spec to the pulseaudio format */ + if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec)) + goto invalid_spec; - if (!(o = pa_context_get_sink_input_info (pulsesink->context, - pa_stream_get_index (pulsesink->stream), - gst_pulsesink_sink_input_info_cb, pulsesink))) { + pa_threaded_mainloop_lock (psink->mainloop); - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_get_sink_input_info() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + /* we need a context and a no stream */ + g_assert (pbuf->context); + g_assert (!pbuf->stream); - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + /* enable event notifications */ + GST_LOG_OBJECT (psink, "subscribing to context events"); + if (!(o = pa_context_subscribe (pbuf->context, + PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) + goto subscribe_failed; - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; - - pa_threaded_mainloop_wait (pulsesink->mainloop); - } + pa_operation_unref (o); -unlock: + /* initialize the channel map */ + gst_pulse_gst_to_channel_map (&channel_map, spec); + + /* find a good name for the stream */ + if (psink->stream_name) + name = psink->stream_name; + else + name = "Playback Stream"; + + /* create a stream */ + GST_LOG_OBJECT (psink, "creating stream with name %s", name); + if (!(pbuf->stream = pa_stream_new (pbuf->context, + name, &pbuf->sample_spec, &channel_map))) + goto stream_failed; + + /* install essential callbacks */ + pa_stream_set_state_callback (pbuf->stream, + gst_pulsering_stream_state_cb, pbuf); + pa_stream_set_write_callback (pbuf->stream, + gst_pulsering_stream_request_cb, pbuf); + pa_stream_set_latency_update_callback (pbuf->stream, + gst_pulsering_stream_latency_update_cb, pbuf); + + /* buffering requirements */ + memset (&buf_attr, 0, sizeof (buf_attr)); + buf_attr.tlength = spec->segtotal * spec->segsize; + buf_attr.maxlength = buf_attr.tlength * 2; + //buf_attr.prebuf = buf_attr.tlength; + buf_attr.prebuf = spec->segsize; + buf_attr.minreq = spec->segsize; - if (o) - pa_operation_unref (o); + GST_INFO_OBJECT (psink, "tlength: %d", buf_attr.tlength); + GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr.maxlength); + GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr.prebuf); + GST_INFO_OBJECT (psink, "minreq: %d", buf_attr.minreq); + + /* configure volume when we changed it, else we leave the default */ + if (psink->volume_set) { + GST_LOG_OBJECT (psink, "have volume of %f", psink->volume); + pv = &v; + gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels, + psink->volume); + } else { + pv = NULL; + } - v = pulsesink->volume; + /* construct the flags */ + flags = PA_STREAM_INTERPOLATE_TIMING | + PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_NOT_MONOTONOUS | +#if HAVE_PULSE_0_9_11 + PA_STREAM_ADJUST_LATENCY | +#endif + PA_STREAM_START_CORKED; - pa_threaded_mainloop_unlock (pulsesink->mainloop); + /* we always start corked (see flags above) */ + pbuf->corked = TRUE; - return v; -} -#endif + /* try to connect now */ + GST_LOG_OBJECT (psink, "connect for playback to device %s", + GST_STR_NULL (psink->device)); + if (pa_stream_connect_playback (pbuf->stream, psink->device, + &buf_attr, flags, pv, NULL) < 0) + goto connect_failed; -static gboolean -gst_pulsesink_is_dead (GstPulseSink * pulsesink) -{ + for (;;) { + pa_stream_state_t state; - if (!pulsesink->context - || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pulsesink->context)) - || !pulsesink->stream - || !PA_STREAM_IS_GOOD (pa_stream_get_state (pulsesink->stream))) { - const gchar *err_str = pulsesink->context ? - pa_strerror (pa_context_errno (pulsesink->context)) : NULL; + state = pa_stream_get_state (pbuf->stream); - GST_ELEMENT_ERROR ((pulsesink), RESOURCE, FAILED, ("Disconnected: %s", - err_str), (NULL)); - return TRUE; - } + GST_LOG_OBJECT (psink, "stream state is now %d", state); - return FALSE; -} + if (!PA_STREAM_IS_GOOD (state)) + goto connect_failed; -static void -gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol, - void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + if (state == PA_STREAM_READY) + break; - if (!i) - return; + /* Wait until the stream is ready */ + pa_threaded_mainloop_wait (psink->mainloop); + } - if (!pulsesink->stream) - return; + GST_LOG_OBJECT (psink, "stream is acquired now"); - g_assert (i->index == pa_stream_get_device_index (pulsesink->stream)); + /* get the actual buffering properties now */ + buf_attr_ptr = pa_stream_get_buffer_attr (pbuf->stream); - g_free (pulsesink->device_description); - pulsesink->device_description = g_strdup (i->description); -} + GST_INFO_OBJECT (psink, "tlength: %d", buf_attr_ptr->tlength); + GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr_ptr->maxlength); + GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr_ptr->prebuf); + GST_INFO_OBJECT (psink, "minreq: %d", buf_attr_ptr->minreq); -static gchar * -gst_pulsesink_device_description (GstPulseSink * pulsesink) -{ - pa_operation *o = NULL; - gchar *t; + spec->segsize = buf_attr.minreq; + spec->segtotal = buf_attr.tlength / spec->segsize; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_unlock (psink->mainloop); - if (!pulsesink->stream) - goto unlock; + return TRUE; - if (!(o = pa_context_get_sink_info_by_index (pulsesink->context, - pa_stream_get_device_index (pulsesink->stream), - gst_pulsesink_sink_info_cb, pulsesink))) { + /* ERRORS */ +unlock_and_fail: + { + gst_pulsering_destroy_stream (pbuf); + pa_threaded_mainloop_unlock (psink->mainloop); - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_get_sink_info() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; + return FALSE; } +invalid_spec: + { + GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS, + ("Invalid sample specification."), (NULL)); + return FALSE; + } +subscribe_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_context_subscribe() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +stream_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("Failed to create stream: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +connect_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("Failed to connect stream: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +} - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; +/* free the stream that we acquired before */ +static gboolean +gst_pulseringbuffer_release (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; - pa_threaded_mainloop_wait (pulsesink->mainloop); - } + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + pbuf = GST_PULSERING_BUFFER_CAST (buf); -unlock: + pa_threaded_mainloop_lock (psink->mainloop); + gst_pulsering_destroy_stream (pbuf); + pa_threaded_mainloop_unlock (psink->mainloop); - if (o) - pa_operation_unref (o); + return TRUE; +} - t = g_strdup (pulsesink->device_description); +/* this method should start the thread that starts pulling data. Usually only + * used in pull-based scheduling */ +static gboolean +gst_pulseringbuffer_activate (GstRingBuffer * buf, gboolean active) +{ + GstPulseSink *psink; - pa_threaded_mainloop_unlock (pulsesink->mainloop); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); - return t; + return TRUE; } -static void -gst_pulsesink_set_property (GObject * object, - guint prop_id, const GValue * value, GParamSpec * pspec) +/* update the corked state of a stream, must be called with the mainloop + * lock */ +static gboolean +gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked) { - GstPulseSink *pulsesink = GST_PULSESINK (object); + pa_operation *o = NULL; + GstPulseSink *psink; + gboolean res = FALSE; - switch (prop_id) { - case PROP_SERVER: - g_free (pulsesink->server); - pulsesink->server = g_value_dup_string (value); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - if (pulsesink->probe) - gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server); + GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked); + if (pbuf->corked != corked) { + if (!(o = pa_stream_cork (pbuf->stream, corked, NULL, NULL))) + goto cork_failed; - break; + while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; + } + pbuf->corked = corked; + } + res = TRUE; - case PROP_DEVICE: - g_free (pulsesink->device); - pulsesink->device = g_value_dup_string (value); - break; +cleanup: + if (o) + pa_operation_unref (o); -#if HAVE_PULSE_0_9_12 - case PROP_VOLUME: - gst_pulsesink_set_volume (pulsesink, g_value_get_double (value)); - break; -#endif + return res; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; + /* ERRORS */ +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto cleanup; + } +cork_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_cork() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto cleanup; } } -static void -gst_pulsesink_get_property (GObject * object, - guint prop_id, GValue * value, GParamSpec * pspec) +/* start/resume playback ASAP */ +static gboolean +gst_pulseringbuffer_start (GstRingBuffer * buf) { + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gboolean res; - GstPulseSink *pulsesink = GST_PULSESINK (object); - - switch (prop_id) { - case PROP_SERVER: - g_value_set_string (value, pulsesink->server); - break; - - case PROP_DEVICE: - g_value_set_string (value, pulsesink->device); - break; + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - case PROP_DEVICE_NAME:{ - char *t = gst_pulsesink_device_description (pulsesink); - g_value_set_string (value, t); - g_free (t); - break; - } + GST_DEBUG_OBJECT (psink, "uncorking"); + pa_threaded_mainloop_lock (psink->mainloop); + pbuf->paused = FALSE; + res = gst_pulsering_set_corked (pbuf, FALSE); + pa_threaded_mainloop_unlock (psink->mainloop); -#if HAVE_PULSE_0_9_12 - case PROP_VOLUME: - g_value_set_double (value, gst_pulsesink_get_volume (pulsesink)); - break; -#endif + return res; +} - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; +/* pause/stop playback ASAP */ +static gboolean +gst_pulseringbuffer_pause (GstRingBuffer * buf) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gboolean res; + + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + GST_DEBUG_OBJECT (psink, "corking"); + pa_threaded_mainloop_lock (psink->mainloop); + /* make sure the commit method stops writing */ + pbuf->paused = TRUE; + res = gst_pulsering_set_corked (pbuf, TRUE); + if (pbuf->in_commit) { + /* we are waiting in a commit, signal */ + GST_DEBUG_OBJECT (psink, "signal commit"); + pa_threaded_mainloop_signal (psink->mainloop, 0); } + pa_threaded_mainloop_unlock (psink->mainloop); + + return res; } static void -gst_pulsesink_context_state_cb (pa_context * c, void *userdata) +gst_pulsering_success_cb (pa_stream * s, int success, void *userdata) { - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + GstPulseRingBuffer *pbuf; + GstPulseSink *psink; - switch (pa_context_get_state (c)) { - case PA_CONTEXT_READY: - case PA_CONTEXT_TERMINATED: - case PA_CONTEXT_FAILED: - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); - break; + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - case PA_CONTEXT_UNCONNECTED: - case PA_CONTEXT_CONNECTING: - case PA_CONTEXT_AUTHORIZING: - case PA_CONTEXT_SETTING_NAME: - break; - } + pa_threaded_mainloop_signal (psink->mainloop, 0); } -#if HAVE_PULSE_0_9_12 -static void -gst_pulsesink_context_subscribe_cb (pa_context * c, - pa_subscription_event_type_t t, uint32_t idx, void *userdata) +/* stop playback, we flush everything. */ +static gboolean +gst_pulseringbuffer_stop (GstRingBuffer * buf) { - GstPulseSink *pulsesink = GST_PULSESINK (userdata); - - if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) && - t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW)) - return; + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + gboolean res = FALSE; + pa_operation *o = NULL; - if (!pulsesink->stream) - return; + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - if (idx != pa_stream_get_index (pulsesink->stream)) - return; - - /* Actually this event is also triggered when other properties of - * the stream change that are unrelated to the volume. However it is - * probably cheaper to signal the change here and check for the - * volume when the GObject property is read instead of querying it always. */ + pa_threaded_mainloop_lock (psink->mainloop); + pbuf->paused = TRUE; + /* Inform anyone waiting in _commit() call that it shall wakeup */ + if (pbuf->in_commit) { + GST_DEBUG_OBJECT (psink, "signal commit thread"); + pa_threaded_mainloop_signal (psink->mainloop, 0); + } - /* inform streaming thread to notify */ - g_atomic_int_compare_and_exchange (&pulsesink->notify, 0, 1); -} -#endif + /* then try to flush, it's not fatal when this fails */ + GST_DEBUG_OBJECT (psink, "flushing"); + if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) { + while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + GST_DEBUG_OBJECT (psink, "wait for completion"); + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; + } + GST_DEBUG_OBJECT (psink, "flush completed"); + } -static void -gst_pulsesink_stream_state_cb (pa_stream * s, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + res = TRUE; - switch (pa_stream_get_state (s)) { +cleanup: + if (o) { + pa_operation_cancel (o); + pa_operation_unref (o); + } + pa_threaded_mainloop_unlock (psink->mainloop); - case PA_STREAM_READY: - case PA_STREAM_FAILED: - case PA_STREAM_TERMINATED: - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); - break; + return res; - case PA_STREAM_UNCONNECTED: - case PA_STREAM_CREATING: - break; + /* ERRORS */ +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto cleanup; } } -static void -gst_pulsesink_stream_request_cb (pa_stream * s, size_t length, void *userdata) +/* in_samples >= out_samples, rate > 1.0 */ +#define FWD_UP_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = s, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, s, bps); \ + s += bps; \ + *accum += outr; \ + if ((*accum << 1) >= inr) { \ + *accum -= inr; \ + d += bps; \ + } \ + } \ + in_samples -= (s - sb)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess); \ +} G_STMT_END + +/* out_samples > in_samples, for rates smaller than 1.0 */ +#define FWD_DOWN_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = s, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, s, bps); \ + d += bps; \ + *accum += inr; \ + if ((*accum << 1) >= outr) { \ + *accum -= outr; \ + s += bps; \ + } \ + } \ + in_samples -= (s - sb)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess); \ +} G_STMT_END + +#define REV_UP_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = se, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, se, bps); \ + se -= bps; \ + *accum += outr; \ + while ((*accum << 1) >= inr) { \ + *accum -= inr; \ + d += bps; \ + } \ + } \ + in_samples -= (sb - se)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess); \ +} G_STMT_END + +#define REV_DOWN_SAMPLES(s,se,d,de) \ +G_STMT_START { \ + guint8 *sb = se, *db = d; \ + while (s <= se && d < de) { \ + memcpy (d, se, bps); \ + d += bps; \ + *accum += inr; \ + while ((*accum << 1) >= outr) { \ + *accum -= outr; \ + se -= bps; \ + } \ + } \ + in_samples -= (sb - se)/bps; \ + out_samples -= (d - db)/bps; \ + GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess); \ +} G_STMT_END + + +/* our custom commit function because we write into the buffer of pulseaudio + * instead of keeping our own buffer */ +static guint +gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, + guchar * data, gint in_samples, gint out_samples, gint * accum) { - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + guint result; + guint bps; + guint8 *data_end; + gboolean reverse; + gint *toprocess; + gint inr, outr; + + pbuf = GST_PULSERING_BUFFER_CAST (buf); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); -} + /* FIXME post message rather than using a signal (as mixer interface) */ + if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) + g_object_notify (G_OBJECT (psink), "volume"); + + /* make sure the ringbuffer is started */ + if (G_UNLIKELY (g_atomic_int_get (&buf->state) != + GST_RING_BUFFER_STATE_STARTED)) { + /* see if we are allowed to start it */ + if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE)) + goto no_start; + + GST_DEBUG_OBJECT (buf, "start!"); + if (!gst_ring_buffer_start (buf)) + goto start_failed; + } -static void -gst_pulsesink_stream_latency_update_cb (pa_stream * s, void *userdata) -{ - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + pa_threaded_mainloop_lock (psink->mainloop); + GST_DEBUG_OBJECT (psink, "entering commit"); + pbuf->in_commit = TRUE; - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); -} + bps = buf->spec.bytes_per_sample; -static gboolean -gst_pulsesink_open (GstAudioSink * asink) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); - gchar *name = gst_pulse_client_name (); + /* our toy resampler for trick modes */ + reverse = out_samples < 0; + out_samples = ABS (out_samples); - pa_threaded_mainloop_lock (pulsesink->mainloop); + if (in_samples >= out_samples) + toprocess = &in_samples; + else + toprocess = &out_samples; - g_assert (!pulsesink->context); - g_assert (!pulsesink->stream); + inr = in_samples - 1; + outr = out_samples - 1; - if (!(pulsesink->context = - pa_context_new (pa_threaded_mainloop_get_api (pulsesink->mainloop), - name))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to create context"), (NULL)); - goto unlock_and_fail; - } + /* data_end points to the last sample we have to write, not past it. This is + * needed to properly handle reverse playback: it points to the last sample. */ + data_end = data + (bps * inr); - pa_context_set_state_callback (pulsesink->context, - gst_pulsesink_context_state_cb, pulsesink); -#if HAVE_PULSE_0_9_12 - pa_context_set_subscribe_callback (pulsesink->context, - gst_pulsesink_context_subscribe_cb, pulsesink); -#endif + if (pbuf->paused) + goto was_paused; - if (pa_context_connect (pulsesink->context, pulsesink->server, 0, NULL) < 0) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } + while (*toprocess > 0) { + size_t avail; + guint towrite; + gint64 offset; - for (;;) { - pa_context_state_t state; + GST_LOG_OBJECT (psink, "need to write %d samples", *toprocess); + for (;;) { + if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1) + goto writable_size_failed; + + /* convert to samples, we can only deal with multiples of the + * sample size */ + avail /= bps; - state = pa_context_get_state (pulsesink->context); + /* We always try to satisfy a request for data */ + GST_LOG_OBJECT (psink, "writable samples %" G_GSIZE_FORMAT, avail); + if (avail > 0) + break; + + /* we can't write a single byte, wait a bit */ + GST_LOG_OBJECT (psink, "waiting for free space"); + pa_threaded_mainloop_wait (psink->mainloop); - if (!PA_CONTEXT_IS_GOOD (state)) { - GST_DEBUG_OBJECT (pulsesink, "Context state was not READY. Got: %d", - state); - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; + if (pbuf->paused) + goto was_paused; } - if (state == PA_CONTEXT_READY) - break; + if (avail > out_samples) + avail = out_samples; + + GST_LOG_OBJECT (psink, "writing %d samples at offset %" G_GUINT64_FORMAT, + avail, *sample); + + offset = *sample * bps; + towrite = avail * bps; + + if (G_LIKELY (inr == outr && !reverse)) { + /* no rate conversion, simply write out the samples */ + if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset, + PA_SEEK_ABSOLUTE) < 0) + goto write_failed; + + data += towrite; + in_samples -= avail; + out_samples -= avail; + } else { + guint8 *dest, *d, *d_end; + + /* we need to allocate a temporary buffer to resample the data into, + * FIXME, we should have a pulseaudio API to allocate this buffer for us + * from the shared memory. */ + dest = d = g_malloc (towrite); + d_end = d + towrite; + + if (!reverse) { + if (inr >= outr) + /* forward speed up */ + FWD_UP_SAMPLES (data, data_end, d, d_end); + else + /* forward slow down */ + FWD_DOWN_SAMPLES (data, data_end, d, d_end); + } else { + if (inr >= outr) + /* reverse speed up */ + REV_UP_SAMPLES (data, data_end, d, d_end); + else + /* reverse slow down */ + REV_DOWN_SAMPLES (data, data_end, d, d_end); + } + /* see what we have left to write */ + towrite = (d - dest); + if (pa_stream_write (pbuf->stream, dest, towrite, + g_free, offset, PA_SEEK_ABSOLUTE) < 0) + goto write_failed; - /* Wait until the context is ready */ - pa_threaded_mainloop_wait (pulsesink->mainloop); + avail = towrite / bps; + } + *sample += avail; } + /* we consumed all samples here */ + data = data_end + bps; - pa_threaded_mainloop_unlock (pulsesink->mainloop); - g_free (name); - return TRUE; + pbuf->in_commit = FALSE; + pa_threaded_mainloop_unlock (psink->mainloop); -unlock_and_fail: +done: + result = inr - ((data_end - data) / bps); + GST_LOG_OBJECT (psink, "wrote %d samples", result); - gst_pulsesink_destroy_context (pulsesink); + return result; - pa_threaded_mainloop_unlock (pulsesink->mainloop); - g_free (name); - return FALSE; + /* ERRORS */ +unlock_and_fail: + { + pbuf->in_commit = FALSE; + GST_LOG_OBJECT (psink, "we are reset"); + pa_threaded_mainloop_unlock (psink->mainloop); + goto done; + } +no_start: + { + GST_LOG_OBJECT (psink, "we can not start"); + return 0; + } +start_failed: + { + GST_LOG_OBJECT (psink, "failed to start the ringbuffer"); + return 0; + } +was_paused: + { + pbuf->in_commit = FALSE; + GST_LOG_OBJECT (psink, "we are paused"); + pa_threaded_mainloop_unlock (psink->mainloop); + goto done; + } +writable_size_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_writable_size() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } +write_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_write() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock_and_fail; + } } +static void gst_pulsesink_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_pulsesink_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); +static void gst_pulsesink_finalize (GObject * object); + +static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event); + +static void gst_pulsesink_init_interfaces (GType type); + +#if (G_BYTE_ORDER == G_LITTLE_ENDIAN) +# define ENDIANNESS "LITTLE_ENDIAN, BIG_ENDIAN" +#else +# define ENDIANNESS "BIG_ENDIAN, LITTLE_ENDIAN" +#endif + +GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink); +GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink, + GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces); + static gboolean -gst_pulsesink_close (GstAudioSink * asink) +gst_pulsesink_interface_supported (GstImplementsInterface * + iface, GType interface_type) { - GstPulseSink *pulsesink = GST_PULSESINK (asink); + GstPulseSink *this = GST_PULSESINK_CAST (iface); - pa_threaded_mainloop_lock (pulsesink->mainloop); - gst_pulsesink_destroy_context (pulsesink); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe) + return TRUE; - return TRUE; + return FALSE; } -static gboolean -gst_pulsesink_prepare (GstAudioSink * asink, GstRingBufferSpec * spec) +static void +gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass) { - pa_buffer_attr buf_attr; - pa_channel_map channel_map; - GstPulseSink *pulsesink = GST_PULSESINK (asink); - pa_operation *o = NULL; - pa_cvolume v; + klass->supported = gst_pulsesink_interface_supported; +} - if (!gst_pulse_fill_sample_spec (spec, &pulsesink->sample_spec)) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, SETTINGS, - ("Invalid sample specification."), (NULL)); - return FALSE; - } +static void +gst_pulsesink_init_interfaces (GType type) +{ + static const GInterfaceInfo implements_iface_info = { + (GInterfaceInitFunc) gst_pulsesink_implements_interface_init, + NULL, + NULL, + }; + static const GInterfaceInfo probe_iface_info = { + (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init, + NULL, + NULL, + }; - pa_threaded_mainloop_lock (pulsesink->mainloop); + g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, + &implements_iface_info); + g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE, + &probe_iface_info); +} - if (!pulsesink->context) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Bad context"), (NULL)); - goto unlock_and_fail; - } +static void +gst_pulsesink_base_init (gpointer g_class) +{ + static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("audio/x-raw-int, " + "endianness = (int) { " ENDIANNESS " }, " + "signed = (boolean) TRUE, " + "width = (int) 16, " + "depth = (int) 16, " + "rate = (int) [ 1, MAX ], " + "channels = (int) [ 1, 32 ];" + "audio/x-raw-float, " + "endianness = (int) { " ENDIANNESS " }, " + "width = (int) 32, " + "rate = (int) [ 1, MAX ], " + "channels = (int) [ 1, 32 ];" + "audio/x-raw-int, " + "endianness = (int) { " ENDIANNESS " }, " + "signed = (boolean) TRUE, " + "width = (int) 32, " + "depth = (int) 32, " + "rate = (int) [ 1, MAX ], " + "channels = (int) [ 1, 32 ];" + "audio/x-raw-int, " + "signed = (boolean) FALSE, " + "width = (int) 8, " + "depth = (int) 8, " + "rate = (int) [ 1, MAX ], " + "channels = (int) [ 1, 32 ];" + "audio/x-alaw, " + "rate = (int) [ 1, MAX], " + "channels = (int) [ 1, 32 ];" + "audio/x-mulaw, " + "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]") + ); - g_assert (!pulsesink->stream); + GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - if (!(o = - pa_context_subscribe (pulsesink->context, - PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) { - const gchar *err_str = pulsesink->context ? - pa_strerror (pa_context_errno (pulsesink->context)) : NULL; + gst_element_class_set_details_simple (element_class, + "PulseAudio Audio Sink", + "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering"); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&pad_template)); +} - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_context_subscribe() failed: %s", err_str), (NULL)); - goto unlock_and_fail; - } +static GstRingBuffer * +gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink) +{ + GstRingBuffer *buffer; - pa_operation_unref (o); + GST_DEBUG_OBJECT (sink, "creating ringbuffer"); + buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL); + GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer); - if (!(pulsesink->stream = pa_stream_new (pulsesink->context, - pulsesink->stream_name ? - pulsesink->stream_name : "Playback Stream", - &pulsesink->sample_spec, - gst_pulse_gst_to_channel_map (&channel_map, spec)))) { - const gchar *err_str = pulsesink->context ? - pa_strerror (pa_context_errno (pulsesink->context)) : NULL; + return buffer; +} - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to create stream: %s", err_str), (NULL)); - goto unlock_and_fail; - } +static void +gst_pulsesink_class_init (GstPulseSinkClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); + GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass); - pa_stream_set_state_callback (pulsesink->stream, - gst_pulsesink_stream_state_cb, pulsesink); - pa_stream_set_write_callback (pulsesink->stream, - gst_pulsesink_stream_request_cb, pulsesink); - pa_stream_set_latency_update_callback (pulsesink->stream, - gst_pulsesink_stream_latency_update_cb, pulsesink); + gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property); - memset (&buf_attr, 0, sizeof (buf_attr)); - buf_attr.tlength = spec->segtotal * spec->segsize; - buf_attr.maxlength = buf_attr.tlength * 2; - buf_attr.prebuf = buf_attr.tlength; - buf_attr.minreq = spec->segsize; + gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event); - if (pulsesink->volume_set) - gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels, - pulsesink->volume); + gstaudiosink_class->create_ringbuffer = + GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer); - if (pa_stream_connect_playback (pulsesink->stream, pulsesink->device, - &buf_attr, - PA_STREAM_INTERPOLATE_TIMING | - PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_NOT_MONOTONOUS | -#if HAVE_PULSE_0_9_11 - PA_STREAM_ADJUST_LATENCY | + /* Overwrite GObject fields */ + g_object_class_install_property (gobject_class, + PROP_SERVER, + g_param_spec_string ("server", "Server", + "The PulseAudio server to connect to", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_DEVICE, + g_param_spec_string ("device", "Sink", + "The PulseAudio sink device to connect to", NULL, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, + PROP_DEVICE_NAME, + g_param_spec_string ("device-name", "Device name", + "Human-readable name of the sound device", NULL, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); +#if HAVE_PULSE_0_9_12 + g_object_class_install_property (gobject_class, + PROP_VOLUME, + g_param_spec_double ("volume", "Volume", + "Volume of this stream", 0.0, 1000.0, 1.0, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); #endif - PA_STREAM_START_CORKED, pulsesink->volume_set ? &v : NULL, NULL) < 0) { - - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } - pulsesink->corked = TRUE; +} - for (;;) { - pa_stream_state_t state; +/* returns the current time of the sink ringbuffer */ +static GstClockTime +gst_pulse_sink_get_time (GstClock * clock, GstBaseAudioSink * sink) +{ + GstPulseSink *psink; + GstPulseRingBuffer *pbuf; + pa_usec_t time; - state = pa_stream_get_state (pulsesink->stream); + if (sink->ringbuffer == NULL || sink->ringbuffer->spec.rate == 0) + return GST_CLOCK_TIME_NONE; - if (!PA_STREAM_IS_GOOD (state)) { - GST_DEBUG_OBJECT (pulsesink, "Stream state was not READY. Got: %d", - state); - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("Failed to connect stream: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } + pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); - if (state == PA_STREAM_READY) - break; + pa_threaded_mainloop_lock (psink->mainloop); + /* if we don't have enough data to get a timestamp, just return 0 */ + if (pa_stream_get_time (pbuf->stream, &time) < 0) + time = 0; + pa_threaded_mainloop_unlock (psink->mainloop); - /* Wait until the stream is ready */ - pa_threaded_mainloop_wait (pulsesink->mainloop); - } + time *= 1000; - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return TRUE; + GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT, + GST_TIME_ARGS (time)); -unlock_and_fail: + return time; +} - gst_pulsesink_destroy_stream (pulsesink); +static void +gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) +{ + pulsesink->server = NULL; + pulsesink->device = NULL; + pulsesink->device_description = NULL; - pa_threaded_mainloop_unlock (pulsesink->mainloop); + pulsesink->volume = 1.0; + pulsesink->volume_set = FALSE; - return FALSE; -} + pulsesink->notify = 0; -static gboolean -gst_pulsesink_unprepare (GstAudioSink * asink) -{ - GstPulseSink *pulsesink = GST_PULSESINK (asink); + g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ())); + g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0); - pa_threaded_mainloop_lock (pulsesink->mainloop); - gst_pulsesink_destroy_stream (pulsesink); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */ - return TRUE; + /* override with a custom clock */ + if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock) + gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock); + GST_BASE_AUDIO_SINK (pulsesink)->provided_clock = + gst_audio_clock_new ("GstPulseSinkClock", + (GstAudioClockGetTimeFunc) gst_pulse_sink_get_time, pulsesink); } -static guint -gst_pulsesink_write (GstAudioSink * asink, gpointer data, guint length) +static void +gst_pulsesink_finalize (GObject * object) { - GstPulseSink *pulsesink = GST_PULSESINK (asink); - pa_operation *o = NULL; - size_t sum = 0; + GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); - /* FIXME post message rather than using a signal (as mixer interface) */ - if (g_atomic_int_compare_and_exchange (&pulsesink->notify, 1, 0)) - g_object_notify (G_OBJECT (pulsesink), "volume"); + pa_threaded_mainloop_stop (pulsesink->mainloop); - pa_threaded_mainloop_lock (pulsesink->mainloop); + g_free (pulsesink->server); + g_free (pulsesink->device); - pulsesink->in_write = TRUE; + pa_threaded_mainloop_free (pulsesink->mainloop); - /* Make sure the stream is uncorked - it might not be on a caps change */ - if (pulsesink->corked) { - if (!(o = pa_stream_cork (pulsesink->stream, FALSE, NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_cork() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } + if (pulsesink->probe) { + gst_pulseprobe_free (pulsesink->probe); + pulsesink->probe = NULL; + } - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - pulsesink->corked = FALSE; + G_OBJECT_CLASS (parent_class)->finalize (object); +} - pa_operation_unref (o); - o = NULL; - } +#if HAVE_PULSE_0_9_12 +static void +gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume) +{ + pa_cvolume v; + pa_operation *o = NULL; + GstPulseRingBuffer *pbuf; - while (length > 0) { - size_t l; + pa_threaded_mainloop_lock (psink->mainloop); - for (;;) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; - - if ((l = pa_stream_writable_size (pulsesink->stream)) == (size_t) - 1) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_writable_size() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } + psink->volume = volume; + psink->volume_set = TRUE; - if (l >= length) - break; + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL) + goto unlock; - if (pulsesink->did_reset) - goto unlock_and_fail; + gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume); - pa_threaded_mainloop_wait (pulsesink->mainloop); - } + if (!(o = pa_context_set_sink_input_volume (pbuf->context, + pa_stream_get_index (pbuf->stream), &v, NULL, NULL))) + goto volume_failed; + + /* We don't really care about the result of this call */ +unlock: - if (l > length) - l = length; + if (o) + pa_operation_unref (o); - if (pa_stream_write (pulsesink->stream, data, l, NULL, 0, - PA_SEEK_RELATIVE) < 0) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_write() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } + pa_threaded_mainloop_unlock (psink->mainloop); - data = (guint8 *) data + l; - length -= l; + return; - sum += l; + /* ERRORS */ +volume_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_set_sink_input_volume() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; } +} - pulsesink->did_reset = FALSE; - pulsesink->in_write = FALSE; +static void +gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i, + int eol, void *userdata) +{ + GstPulseRingBuffer *pbuf; + GstPulseSink *psink; - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return sum; + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); -unlock_and_fail: + if (!i) + return; - pulsesink->did_reset = FALSE; - pulsesink->in_write = FALSE; + if (!pbuf->stream) + return; - if (o) - pa_operation_unref (o); + g_assert (i->index == pa_stream_get_index (pbuf->stream)); - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return (guint) - 1; + psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume)); } -static guint -gst_pulsesink_delay (GstAudioSink * asink) +static gdouble +gst_pulsesink_get_volume (GstPulseSink * psink) { - GstPulseSink *pulsesink = GST_PULSESINK (asink); - pa_usec_t t; - - pa_threaded_mainloop_lock (pulsesink->mainloop); + GstPulseRingBuffer *pbuf; + pa_operation *o = NULL; + gdouble v; - for (;;) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; + pa_threaded_mainloop_lock (psink->mainloop); - if (pa_stream_get_latency (pulsesink->stream, &t, NULL) >= 0) - break; + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL || pbuf->stream == NULL) + goto no_buffer; - if (pa_context_errno (pulsesink->context) != PA_ERR_NODATA) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_get_latency() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; - } + if (!(o = pa_context_get_sink_input_info (pbuf->context, + pa_stream_get_index (pbuf->stream), + gst_pulsesink_sink_input_info_cb, pbuf))) + goto info_failed; - pa_threaded_mainloop_wait (pulsesink->mainloop); + while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) + goto unlock; } - pa_threaded_mainloop_unlock (pulsesink->mainloop); +unlock: + if (o) + pa_operation_unref (o); - return gst_util_uint64_scale_int (t, pulsesink->sample_spec.rate, 1000000LL); + v = psink->volume; + pa_threaded_mainloop_unlock (psink->mainloop); -unlock_and_fail: + return v; - pa_threaded_mainloop_unlock (pulsesink->mainloop); - return 0; + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; + } +info_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_get_sink_input_info() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } +#endif static void -gst_pulsesink_success_cb (pa_stream * s, int success, void *userdata) +gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol, + void *userdata) { - GstPulseSink *pulsesink = GST_PULSESINK (userdata); + GstPulseRingBuffer *pbuf; + GstPulseSink *psink; + + pbuf = GST_PULSERING_BUFFER_CAST (userdata); + psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + + if (!i) + return; + + if (!pbuf->stream) + return; + + g_assert (i->index == pa_stream_get_device_index (pbuf->stream)); - pulsesink->operation_success = !!success; - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); + g_free (psink->device_description); + psink->device_description = g_strdup (i->description); } -static void -gst_pulsesink_reset (GstAudioSink * asink) +static gchar * +gst_pulsesink_device_description (GstPulseSink * psink) { - GstPulseSink *pulsesink = GST_PULSESINK (asink); + GstPulseRingBuffer *pbuf; pa_operation *o = NULL; + gchar *t; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL || pbuf->stream == NULL) + goto no_buffer; - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; + if (!(o = pa_context_get_sink_info_by_index (pbuf->context, + pa_stream_get_device_index (pbuf->stream), + gst_pulsesink_sink_info_cb, pbuf))) + goto info_failed; - if (!(o = - pa_stream_flush (pulsesink->stream, gst_pulsesink_success_cb, - pulsesink))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_flush() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; + while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait (psink->mainloop); + if (gst_pulsering_is_dead (psink, pbuf)) + goto unlock; } - /* Inform anyone waiting in _write() call that it shall wakeup */ - if (pulsesink->in_write) { - pulsesink->did_reset = TRUE; - pa_threaded_mainloop_signal (pulsesink->mainloop, 0); - } +unlock: + if (o) + pa_operation_unref (o); - pulsesink->operation_success = FALSE; - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + t = g_strdup (psink->device_description); + pa_threaded_mainloop_unlock (psink->mainloop); - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock_and_fail; + return t; - pa_threaded_mainloop_wait (pulsesink->mainloop); + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; } - - if (!pulsesink->operation_success) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Flush failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock_and_fail; +info_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_get_sink_info() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; } +} -unlock_and_fail: +static void +gst_pulsesink_set_property (GObject * object, + guint prop_id, const GValue * value, GParamSpec * pspec) +{ + GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); - if (o) { - pa_operation_cancel (o); - pa_operation_unref (o); + switch (prop_id) { + case PROP_SERVER: + g_free (pulsesink->server); + pulsesink->server = g_value_dup_string (value); + if (pulsesink->probe) + gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server); + break; + case PROP_DEVICE: + g_free (pulsesink->device); + pulsesink->device = g_value_dup_string (value); + break; +#if HAVE_PULSE_0_9_12 + case PROP_VOLUME: + gst_pulsesink_set_volume (pulsesink, g_value_get_double (value)); + break; +#endif + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; } +} + +static void +gst_pulsesink_get_property (GObject * object, + guint prop_id, GValue * value, GParamSpec * pspec) +{ - pa_threaded_mainloop_unlock (pulsesink->mainloop); + GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); + + switch (prop_id) { + case PROP_SERVER: + g_value_set_string (value, pulsesink->server); + break; + case PROP_DEVICE: + g_value_set_string (value, pulsesink->device); + break; + case PROP_DEVICE_NAME:{ + char *t = gst_pulsesink_device_description (pulsesink); + g_value_set_string (value, t); + g_free (t); + break; + } +#if HAVE_PULSE_0_9_12 + case PROP_VOLUME: + g_value_set_double (value, gst_pulsesink_get_volume (pulsesink)); + break; +#endif + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } } static void -gst_pulsesink_change_title (GstPulseSink * pulsesink, const gchar * t) +gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t) { pa_operation *o = NULL; + GstPulseRingBuffer *pbuf; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); - g_free (pulsesink->stream_name); - pulsesink->stream_name = g_strdup (t); + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL) + goto no_buffer; - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; + g_free (pbuf->stream_name); + pbuf->stream_name = g_strdup (t); - if (!(o = - pa_stream_set_name (pulsesink->stream, pulsesink->stream_name, NULL, - NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_set_name() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; - /* We're not interested if this operation failed or not */ + if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL))) + goto name_failed; + /* We're not interested if this operation failed or not */ unlock: - if (o) pa_operation_unref (o); + pa_threaded_mainloop_unlock (psink->mainloop); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + return; + + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; + } +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto unlock; + } +name_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_set_name() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } #if HAVE_PULSE_0_9_11 static void -gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l) +gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l) { - static const gchar *const map[] = { GST_TAG_TITLE, PA_PROP_MEDIA_TITLE, GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST, @@ -1102,11 +1650,11 @@ gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l) /* We might add more here later on ... */ NULL }; - pa_proplist *pl = NULL; const gchar *const *t; gboolean empty = TRUE; pa_operation *o = NULL; + GstPulseRingBuffer *pbuf; pl = pa_proplist_new (); @@ -1123,44 +1671,61 @@ gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l) g_free (n); } } - if (empty) goto finish; - pa_threaded_mainloop_lock (pulsesink->mainloop); + pa_threaded_mainloop_lock (psink->mainloop); + pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + if (pbuf == NULL) + goto no_buffer; - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; + if (gst_pulsering_is_dead (psink, pbuf)) + goto server_dead; - if (!(o = - pa_stream_proplist_update (pulsesink->stream, PA_UPDATE_REPLACE, pl, - NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_proplist_update() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } + if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE, + pl, NULL, NULL))) + goto update_failed; /* We're not interested if this operation failed or not */ - unlock: if (o) pa_operation_unref (o); - pa_threaded_mainloop_unlock (pulsesink->mainloop); + pa_threaded_mainloop_unlock (psink->mainloop); finish: if (pl) pa_proplist_free (pl); + + return; + + /* ERRORS */ +no_buffer: + { + GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); + goto unlock; + } +server_dead: + { + GST_DEBUG_OBJECT (psink, "the server is dead"); + goto unlock; + } +update_failed: + { + GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, + ("pa_stream_proplist_update() failed: %s", + pa_strerror (pa_context_errno (pbuf->context))), (NULL)); + goto unlock; + } } #endif static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event) { - GstPulseSink *pulsesink = GST_PULSESINK (sink); + GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_TAG:{ @@ -1207,61 +1772,3 @@ gst_pulsesink_event (GstBaseSink * sink, GstEvent * event) return GST_BASE_SINK_CLASS (parent_class)->event (sink, event); } - -static void -gst_pulsesink_pause (GstPulseSink * pulsesink, gboolean b) -{ - pa_operation *o = NULL; - - pa_threaded_mainloop_lock (pulsesink->mainloop); - - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; - - if (!(o = pa_stream_cork (pulsesink->stream, b, NULL, NULL))) { - GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, - ("pa_stream_cork() failed: %s", - pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); - goto unlock; - } - - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { - if (gst_pulsesink_is_dead (pulsesink)) - goto unlock; - pa_threaded_mainloop_wait (pulsesink->mainloop); - } - pulsesink->corked = b; - -unlock: - if (o) - pa_operation_unref (o); - - pa_threaded_mainloop_unlock (pulsesink->mainloop); -} - - -static GstStateChangeReturn -gst_pulsesink_change_state (GstElement * element, GstStateChange transition) -{ - GstStateChangeReturn res; - GstPulseSink *this = GST_PULSESINK (element); - - switch (transition) { - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - gst_pulsesink_pause (this, FALSE); - break; - default: - break; - } - - res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - gst_pulsesink_pause (this, TRUE); - break; - default: - break; - } - return res; -} diff --git a/ext/pulse/pulsesink.h b/ext/pulse/pulsesink.h index 9ec626cd..2f74ac3a 100644 --- a/ext/pulse/pulsesink.h +++ b/ext/pulse/pulsesink.h @@ -42,39 +42,31 @@ G_BEGIN_DECLS (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSESINK)) #define GST_IS_PULSESINK_CLASS(obj) \ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSESINK)) +#define GST_PULSESINK_CAST(obj) \ + ((GstPulseSink *)(obj)) typedef struct _GstPulseSink GstPulseSink; typedef struct _GstPulseSinkClass GstPulseSinkClass; struct _GstPulseSink { - GstAudioSink sink; + GstBaseAudioSink sink; gchar *server, *device, *stream_name; + gchar *device_description; pa_threaded_mainloop *mainloop; - pa_context *context; - pa_stream *stream; - - pa_sample_spec sample_spec; - GstPulseProbe *probe; gdouble volume; gboolean volume_set; - - gchar *device_description; - - gboolean operation_success; - gboolean did_reset, in_write; - gboolean corked; gint notify; }; struct _GstPulseSinkClass { - GstAudioSinkClass parent_class; + GstBaseAudioSinkClass parent_class; }; GType gst_pulsesink_get_type (void); -- cgit