From 2d88251d9d90aafab7e6e953a6627530e681e2de Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 28 Jul 2009 18:29:07 +0200 Subject: pulsesrc: cleanups Keep track of the paused state of the source and leave the read function when paused. don't wait for a latency update when the delay is not yet known but simply return 0 instead of blocking. Keep track of the corked state of the stream. Fix the state changes. --- ext/pulse/pulsesrc.c | 274 ++++++++++++++++++++++++++++++++------------------- ext/pulse/pulsesrc.h | 4 +- 2 files changed, 175 insertions(+), 103 deletions(-) diff --git a/ext/pulse/pulsesrc.c b/ext/pulse/pulsesrc.c index 9126ec13..a257908c 100644 --- a/ext/pulse/pulsesrc.c +++ b/ext/pulse/pulsesrc.c @@ -271,7 +271,7 @@ gst_pulsesrc_init (GstPulseSrc * pulsesrc, GstPulseSrcClass * klass) #endif pulsesrc->operation_success = FALSE; - pulsesrc->did_reset = FALSE; + pulsesrc->paused = FALSE; pulsesrc->in_read = FALSE; pulsesrc->mainloop = pa_threaded_mainloop_new (); @@ -510,7 +510,10 @@ gst_pulsesrc_stream_request_cb (pa_stream * s, size_t length, void *userdata) GST_LOG_OBJECT (pulsesrc, "got request for length %" G_GSIZE_FORMAT, length); - pa_threaded_mainloop_signal (pulsesrc->mainloop, 0); + if (pulsesrc->in_read) { + /* only signal when reading */ + pa_threaded_mainloop_signal (pulsesrc->mainloop, 0); + } } static void @@ -527,8 +530,6 @@ gst_pulsesrc_stream_latency_update_cb (pa_stream * s, void *userdata) GST_TIMEVAL_TO_TIME (info->timestamp), info->write_index_corrupt, info->write_index, info->read_index_corrupt, info->read_index, info->source_usec, info->configured_source_usec); - - pa_threaded_mainloop_signal (pulsesrc->mainloop, 0); } static void @@ -598,14 +599,16 @@ gst_pulsesrc_open (GstAudioSrc * asrc) g_free (name); return TRUE; + /* ERRORS */ unlock_and_fail: + { + gst_pulsesrc_destroy_context (pulsesrc); - gst_pulsesrc_destroy_context (pulsesrc); - - pa_threaded_mainloop_unlock (pulsesrc->mainloop); + pa_threaded_mainloop_unlock (pulsesrc->mainloop); - g_free (name); - return FALSE; + g_free (name); + return FALSE; + } } static gboolean @@ -643,43 +646,44 @@ gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length) size_t sum = 0; pa_threaded_mainloop_lock (pulsesrc->mainloop); - pulsesrc->in_read = TRUE; + if (pulsesrc->paused) + goto was_paused; + while (length > 0) { size_t l; GST_LOG_OBJECT (pulsesrc, "reading %u bytes", length); + /*check if we have a leftover buffer */ if (!pulsesrc->read_buffer) { for (;;) { if (gst_pulsesrc_is_dead (pulsesrc)) goto unlock_and_fail; + /* read all available data, we keep a pointer to the data and the length + * and take from it what we need. */ if (pa_stream_peek (pulsesrc->stream, &pulsesrc->read_buffer, - &pulsesrc->read_buffer_length) < 0) { - GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, - ("pa_stream_peek() failed: %s", - pa_strerror (pa_context_errno (pulsesrc->context))), (NULL)); - goto unlock_and_fail; - } + &pulsesrc->read_buffer_length) < 0) + goto peek_failed; + GST_LOG_OBJECT (pulsesrc, "have data of %" G_GSIZE_FORMAT " bytes", pulsesrc->read_buffer_length); /* if we have data, process if */ - if (pulsesrc->read_buffer) + if (pulsesrc->read_buffer && pulsesrc->read_buffer_length) break; - if (pulsesrc->did_reset) - goto unlock_and_fail; - + /* now wait for more data to become available */ GST_LOG_OBJECT (pulsesrc, "waiting for data"); pa_threaded_mainloop_wait (pulsesrc->mainloop); + + if (pulsesrc->paused) + goto was_paused; } } - g_assert (pulsesrc->read_buffer && pulsesrc->read_buffer_length); - l = pulsesrc->read_buffer_length > length ? length : pulsesrc->read_buffer_length; @@ -690,78 +694,90 @@ gst_pulsesrc_read (GstAudioSrc * asrc, gpointer data, guint length) data = (guint8 *) data + l; length -= l; - sum += l; if (pulsesrc->read_buffer_length <= 0) { - if (pa_stream_drop (pulsesrc->stream) < 0) { - GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, - ("pa_stream_drop() failed: %s", - pa_strerror (pa_context_errno (pulsesrc->context))), (NULL)); - goto unlock_and_fail; - } + /* we copied all of the data, drop it now */ + if (pa_stream_drop (pulsesrc->stream) < 0) + goto drop_failed; + /* reset pointer to data */ pulsesrc->read_buffer = NULL; pulsesrc->read_buffer_length = 0; } } - pulsesrc->did_reset = FALSE; pulsesrc->in_read = FALSE; - pa_threaded_mainloop_unlock (pulsesrc->mainloop); + return sum; /* ERRORS */ +was_paused: + { + GST_LOG_OBJECT (pulsesrc, "we are paused"); + goto unlock_and_fail; + } +peek_failed: + { + GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, + ("pa_stream_peek() failed: %s", + pa_strerror (pa_context_errno (pulsesrc->context))), (NULL)); + goto unlock_and_fail; + } +drop_failed: + { + GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, + ("pa_stream_drop() failed: %s", + pa_strerror (pa_context_errno (pulsesrc->context))), (NULL)); + goto unlock_and_fail; + } unlock_and_fail: { - pulsesrc->did_reset = FALSE; pulsesrc->in_read = FALSE; - pa_threaded_mainloop_unlock (pulsesrc->mainloop); + return (guint) - 1; } } +/* return the delay in samples */ static guint gst_pulsesrc_delay (GstAudioSrc * asrc) { GstPulseSrc *pulsesrc = GST_PULSESRC_CAST (asrc); - pa_usec_t t; - - int negative; + int negative, res; + guint result; pa_threaded_mainloop_lock (pulsesrc->mainloop); + if (gst_pulsesrc_is_dead (pulsesrc)) + goto server_dead; - for (;;) { - if (gst_pulsesrc_is_dead (pulsesrc)) - goto unlock_and_fail; - - if (pa_stream_get_latency (pulsesrc->stream, &t, &negative) >= 0) - break; - - if (pa_context_errno (pulsesrc->context) != PA_ERR_NODATA) { - GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, - ("pa_stream_get_latency() failed: %s", - pa_strerror (pa_context_errno (pulsesrc->context))), (NULL)); - goto unlock_and_fail; - } - - pa_threaded_mainloop_wait (pulsesrc->mainloop); - } - - if (negative) - t = 0; + /* get the latency, this can fail when we don't have a latency update yet. + * We don't want to wait for latency updates here but we just return 0. */ + res = pa_stream_get_latency (pulsesrc->stream, &t, &negative); pa_threaded_mainloop_unlock (pulsesrc->mainloop); - return (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL); - -unlock_and_fail: + if (res > 0) { + GST_DEBUG_OBJECT (pulsesrc, "could not get latency"); + result = 0; + } else { + if (negative) + result = 0; + else + result = (guint) ((t * pulsesrc->sample_spec.rate) / 1000000LL); + } + return result; - pa_threaded_mainloop_unlock (pulsesrc->mainloop); - return 0; + /* ERRORS */ +server_dead: + { + GST_DEBUG_OBJECT (pulsesrc, "the server is dead"); + pa_threaded_mainloop_unlock (pulsesrc->mainloop); + return 0; + } } static gboolean @@ -955,6 +971,8 @@ gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec) goto unlock_and_fail; } + pulsesrc->corked = TRUE; + for (;;) { pa_stream_state_t state; @@ -995,11 +1013,12 @@ gst_pulsesrc_prepare (GstAudioSrc * asrc, GstRingBufferSpec * spec) return TRUE; unlock_and_fail: + { + gst_pulsesrc_destroy_stream (pulsesrc); - gst_pulsesrc_destroy_stream (pulsesrc); - - pa_threaded_mainloop_unlock (pulsesrc->mainloop); - return FALSE; + pa_threaded_mainloop_unlock (pulsesrc->mainloop); + return FALSE; + } } static void @@ -1018,6 +1037,7 @@ gst_pulsesrc_reset (GstAudioSrc * asrc) pa_operation *o = NULL; pa_threaded_mainloop_lock (pulsesrc->mainloop); + GST_DEBUG_OBJECT (pulsesrc, "reset"); if (gst_pulsesrc_is_dead (pulsesrc)) goto unlock_and_fail; @@ -1031,9 +1051,9 @@ gst_pulsesrc_reset (GstAudioSrc * asrc) goto unlock_and_fail; } + pulsesrc->paused = TRUE; /* Inform anyone waiting in _write() call that it shall wakeup */ if (pulsesrc->in_read) { - pulsesrc->did_reset = TRUE; pa_threaded_mainloop_signal (pulsesrc->mainloop, 0); } @@ -1062,77 +1082,127 @@ unlock_and_fail: pa_threaded_mainloop_unlock (pulsesrc->mainloop); } -static void -gst_pulsesrc_pause (GstPulseSrc * pulsesrc, gboolean b) +/* update the corked state of a stream, must be called with the mainloop + * lock */ +static gboolean +gst_pulsesrc_set_corked (GstPulseSrc * psrc, gboolean corked, gboolean wait) { pa_operation *o = NULL; + gboolean res = FALSE; + + GST_DEBUG_OBJECT (psrc, "setting corked state to %d", corked); + if (psrc->corked != corked) { + if (!(o = pa_stream_cork (psrc->stream, corked, + gst_pulsesrc_success_cb, psrc))) + goto cork_failed; + + while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) { + pa_threaded_mainloop_wait (psrc->mainloop); + if (gst_pulsesrc_is_dead (psrc)) + goto server_dead; + } + psrc->corked = corked; + } else { + GST_DEBUG_OBJECT (psrc, "skipping, already in requested state"); + } + res = TRUE; - pa_threaded_mainloop_lock (pulsesrc->mainloop); - - if (gst_pulsesrc_is_dead (pulsesrc)) - goto unlock; +cleanup: + if (o) + pa_operation_unref (o); - if (!(o = pa_stream_cork (pulsesrc->stream, b, NULL, NULL))) { + return res; - GST_ELEMENT_ERROR (pulsesrc, RESOURCE, FAILED, + /* ERRORS */ +server_dead: + { + GST_DEBUG_OBJECT (psrc, "the server is dead"); + goto cleanup; + } +cork_failed: + { + GST_ELEMENT_ERROR (psrc, RESOURCE, FAILED, ("pa_stream_cork() failed: %s", - pa_strerror (pa_context_errno (pulsesrc->context))), (NULL)); - goto unlock; + pa_strerror (pa_context_errno (psrc->context))), (NULL)); + goto cleanup; } +} - while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { +/* start/resume playback ASAP */ +static gboolean +gst_pulsesrc_play (GstPulseSrc * psrc) +{ + pa_threaded_mainloop_lock (psrc->mainloop); + GST_DEBUG_OBJECT (psrc, "playing"); + psrc->paused = FALSE; + gst_pulsesrc_set_corked (psrc, FALSE, FALSE); + pa_threaded_mainloop_unlock (psrc->mainloop); - if (gst_pulsesrc_is_dead (pulsesrc)) - goto unlock; + return TRUE; +} - pa_threaded_mainloop_wait (pulsesrc->mainloop); +/* pause/stop playback ASAP */ +static gboolean +gst_pulsesrc_pause (GstPulseSrc * psrc) +{ + pa_threaded_mainloop_lock (psrc->mainloop); + GST_DEBUG_OBJECT (psrc, "pausing"); + /* make sure the commit method stops writing */ + psrc->paused = TRUE; + if (psrc->in_read) { + /* we are waiting in a read, signal */ + GST_DEBUG_OBJECT (psrc, "signal read"); + pa_threaded_mainloop_signal (psrc->mainloop, 0); } + pa_threaded_mainloop_unlock (psrc->mainloop); -unlock: - - if (o) - pa_operation_unref (o); - - pa_threaded_mainloop_unlock (pulsesrc->mainloop); + return TRUE; } static GstStateChangeReturn gst_pulsesrc_change_state (GstElement * element, GstStateChange transition) { + GstStateChangeReturn ret; GstPulseSrc *this = GST_PULSESRC_CAST (element); switch (transition) { - - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - gst_pulsesrc_pause (this, - GST_STATE_TRANSITION_NEXT (transition) == GST_STATE_PAUSED); - break; - case GST_STATE_CHANGE_NULL_TO_READY: - if (!this->mixer) this->mixer = gst_pulsemixer_ctrl_new (G_OBJECT (this), this->server, this->device, GST_PULSEMIXER_SOURCE); - break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + /* uncork and start recording */ + gst_pulsesrc_play (this); + break; + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + /* stop recording ASAP by corking */ + pa_threaded_mainloop_lock (this->mainloop); + GST_DEBUG_OBJECT (this, "corking"); + gst_pulsesrc_set_corked (this, TRUE, FALSE); + pa_threaded_mainloop_unlock (this->mainloop); + break; + default: + break; + } - case GST_STATE_CHANGE_READY_TO_NULL: + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + /* now make sure we get out of the _read method */ + gst_pulsesrc_pause (this); + break; + case GST_STATE_CHANGE_READY_TO_NULL: if (this->mixer) { gst_pulsemixer_ctrl_free (this->mixer); this->mixer = NULL; } - break; - default: - ; + break; } - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - return GST_STATE_CHANGE_SUCCESS; + return ret; } diff --git a/ext/pulse/pulsesrc.h b/ext/pulse/pulsesrc.h index 15d35f72..2358eba1 100644 --- a/ext/pulse/pulsesrc.h +++ b/ext/pulse/pulsesrc.h @@ -70,8 +70,10 @@ struct _GstPulseSrc GstPulseMixerCtrl *mixer; GstPulseProbe *probe; + gboolean corked; gboolean operation_success; - gboolean did_reset, in_read; + gboolean paused; + gboolean in_read; }; struct _GstPulseSrcClass -- cgit