summaryrefslogtreecommitdiffstats
path: root/ext
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@collabora.co.uk>2009-04-09 12:13:44 +0200
committerWim Taymans <wim.taymans@collabora.co.uk>2009-04-09 17:26:20 +0200
commit8855ed90c0bc0361bf4e7386b4a6c28dd108d403 (patch)
tree1d137e36bbfdf23f09a2fbb94d7cabdd037cd6e6 /ext
parent236baa5a132ab0fe9833c7f51ee5f9361049235a (diff)
pulsesink: add beginnings of pull-based scheduling
Diffstat (limited to 'ext')
-rw-r--r--ext/pulse/pulsesink.c135
1 files changed, 116 insertions, 19 deletions
diff --git a/ext/pulse/pulsesink.c b/ext/pulse/pulsesink.c
index 7e4f0014..75e35044 100644
--- a/ext/pulse/pulsesink.c
+++ b/ext/pulse/pulsesink.c
@@ -471,6 +471,99 @@ gst_pulsering_stream_state_cb (pa_stream * s, void *userdata)
}
static void
+gst_pulsering_pull (GstPulseSink * psink, GstPulseRingBuffer * pbuf)
+{
+ GstBaseSink *basesink;
+ GstBaseAudioSink *sink;
+ GstBuffer *buf;
+ GstRingBuffer *rbuf;
+ GstFlowReturn ret;
+ guint len;
+
+ basesink = GST_BASE_SINK (psink);
+ sink = GST_BASE_AUDIO_SINK (psink);
+ rbuf = GST_RING_BUFFER_CAST (pbuf);
+
+ GST_PAD_STREAM_LOCK (basesink->sinkpad);
+
+ len = 882;
+
+ /* would be nice to arrange for pad_alloc_buffer to return data -- as it is we
+ will copy twice, once into data, once into DMA */
+ GST_LOG_OBJECT (basesink, "pulling %d bytes offset %" G_GUINT64_FORMAT
+ " to fill audio buffer", len, basesink->offset);
+ ret =
+ gst_pad_pull_range (basesink->sinkpad, basesink->segment.last_stop, len,
+ &buf);
+
+ if (ret != GST_FLOW_OK) {
+ if (ret == GST_FLOW_UNEXPECTED)
+ goto eos;
+ else
+ goto error;
+ }
+
+ GST_PAD_PREROLL_LOCK (basesink->sinkpad);
+ if (basesink->flushing)
+ goto flushing;
+
+ /* complete preroll and wait for PLAYING */
+ ret = gst_base_sink_do_preroll (basesink, GST_MINI_OBJECT_CAST (buf));
+ if (ret != GST_FLOW_OK)
+ goto preroll_error;
+
+ if (len != GST_BUFFER_SIZE (buf)) {
+ GST_INFO_OBJECT (basesink,
+ "got different size than requested from sink pad: %u != %u", len,
+ GST_BUFFER_SIZE (buf));
+ len = MIN (GST_BUFFER_SIZE (buf), len);
+ }
+ basesink->segment.last_stop += len;
+
+ GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
+
+ GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+
+ return;
+
+error:
+ {
+ GST_WARNING_OBJECT (basesink, "Got flow '%s' but can't return it: %d",
+ gst_flow_get_name (ret), ret);
+ gst_ring_buffer_pause (rbuf);
+ GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+ return;
+ }
+eos:
+ {
+ /* FIXME: this is not quite correct; we'll be called endlessly until
+ * the sink gets shut down; maybe we should set a flag somewhere, or
+ * set segment.stop and segment.duration to the last sample or so */
+ GST_DEBUG_OBJECT (sink, "EOS");
+ gst_ring_buffer_pause (rbuf);
+ gst_element_post_message (GST_ELEMENT_CAST (sink),
+ gst_message_new_eos (GST_OBJECT_CAST (sink)));
+ GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+ }
+flushing:
+ {
+ GST_DEBUG_OBJECT (sink, "we are flushing");
+ gst_ring_buffer_pause (rbuf);
+ GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
+ GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+ return;
+ }
+preroll_error:
+ {
+ GST_DEBUG_OBJECT (sink, "error %s", gst_flow_get_name (ret));
+ gst_ring_buffer_pause (rbuf);
+ GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
+ GST_PAD_STREAM_UNLOCK (basesink->sinkpad);
+ return;
+ }
+}
+
+static void
gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
{
GstPulseSink *psink;
@@ -481,7 +574,10 @@ gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata)
GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length);
- if (pbuf->in_commit) {
+ if (GST_RING_BUFFER_CAST (pbuf)->callback) {
+ /* in pull mode */
+ gst_pulsering_pull (psink, pbuf);
+ } else if (pbuf->in_commit) {
pa_threaded_mainloop_signal (psink->mainloop, 0);
}
}
@@ -586,23 +682,6 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
&buf_attr, flags, pv, NULL) < 0)
goto connect_failed;
- for (;;) {
- pa_stream_state_t state;
-
- state = pa_stream_get_state (pbuf->stream);
-
- GST_LOG_OBJECT (psink, "stream state is now %d", state);
-
- if (!PA_STREAM_IS_GOOD (state))
- goto connect_failed;
-
- if (state == PA_STREAM_READY)
- break;
-
- /* Wait until the stream is ready */
- pa_threaded_mainloop_wait (psink->mainloop);
- }
-
/* our clock will now start from 0 again */
clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock);
gst_audio_clock_reset (clock, 0);
@@ -620,6 +699,22 @@ gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec)
pbuf->sample_spec.rate, GST_SECOND);
GST_LOG_OBJECT (psink, "sample offset %" G_GINT64_FORMAT, pbuf->offset);
+ for (;;) {
+ pa_stream_state_t state;
+
+ state = pa_stream_get_state (pbuf->stream);
+
+ GST_LOG_OBJECT (psink, "stream state is now %d", state);
+
+ if (!PA_STREAM_IS_GOOD (state))
+ goto connect_failed;
+
+ if (state == PA_STREAM_READY)
+ break;
+
+ /* Wait until the stream is ready */
+ pa_threaded_mainloop_wait (psink->mainloop);
+ }
GST_LOG_OBJECT (psink, "stream is acquired now");
@@ -701,6 +796,8 @@ gst_pulseringbuffer_activate (GstRingBuffer * buf, gboolean active)
psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf));
+ GST_DEBUG_OBJECT (psink, "activating");
+
return TRUE;
}
@@ -1338,7 +1435,7 @@ gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass)
g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ()));
g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0);
-// GST_BASE_SINK (pulsesink)->can_activate_pull = TRUE;
+ GST_BASE_SINK (pulsesink)->can_activate_pull = TRUE;
pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */