diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2007-08-10 17:16:53 +0000 |
---|---|---|
committer | Tim-Philipp Müller <tim.muller@collabora.co.uk> | 2009-08-11 02:30:28 +0100 |
commit | cdd82f2a95855f0a3cc0b7be2f1b69d99eaba729 (patch) | |
tree | 22c5df70e58fd7909bb61dba11b25cbfc53ecbcb /gst/rtpmanager/gstrtpjitterbuffer.c | |
parent | 366a7565525789adedcef38236e7ee81b590c981 (diff) |
gst/rtpmanager/: Remove complicated async queue and replace with more simple jitterbuffer code while also fixing some...
Original commit message from CVS:
* gst/rtpmanager/Makefile.am:
* gst/rtpmanager/async_jitter_queue.c:
* gst/rtpmanager/async_jitter_queue.h:
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_class_init),
(rtp_jitter_buffer_init), (rtp_jitter_buffer_finalize),
(rtp_jitter_buffer_new), (compare_seqnum),
(rtp_jitter_buffer_insert), (rtp_jitter_buffer_pop),
(rtp_jitter_buffer_flush), (rtp_jitter_buffer_num_packets),
(rtp_jitter_buffer_get_ts_diff):
* gst/rtpmanager/rtpjitterbuffer.h:
Remove complicated async queue and replace with more simple jitterbuffer
code while also fixing some bugs.
* gst/rtpmanager/gstrtpbin-marshal.list:
* gst/rtpmanager/gstrtpbin.c: (on_new_ssrc), (on_ssrc_collision),
(on_ssrc_validated), (on_bye_ssrc), (on_bye_timeout), (on_timeout),
(create_session), (gst_rtp_bin_class_init), (create_recv_rtp),
(create_send_rtp):
* gst/rtpmanager/gstrtpbin.h:
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_init), (gst_rtp_jitter_buffer_dispose),
(gst_jitter_buffer_sink_parse_caps),
(gst_rtp_jitter_buffer_flush_start),
(gst_rtp_jitter_buffer_flush_stop),
(gst_rtp_jitter_buffer_change_state),
(gst_rtp_jitter_buffer_sink_event), (gst_rtp_jitter_buffer_chain),
(gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_set_property):
* gst/rtpmanager/gstrtpsession.c: (on_new_ssrc),
(on_ssrc_collision), (on_ssrc_validated), (on_bye_ssrc),
(on_bye_timeout), (on_timeout), (gst_rtp_session_class_init),
(gst_rtp_session_init):
* gst/rtpmanager/gstrtpsession.h:
* gst/rtpmanager/rtpsession.c: (on_bye_ssrc), (session_cleanup):
Use new jitterbuffer code.
Expose some new signals in preparation for handling EOS.
Diffstat (limited to 'gst/rtpmanager/gstrtpjitterbuffer.c')
-rw-r--r-- | gst/rtpmanager/gstrtpjitterbuffer.c | 253 |
1 files changed, 116 insertions, 137 deletions
diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 79d06788..fe85f87f 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -4,7 +4,7 @@ * Copyright 2007 Collabora Ltd, * Copyright 2007 Nokia Corporation * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>. - * Copyright 2007 Wim Taymans <wim@fluendo.com> + * Copyright 2007 Wim Taymans <wim.taymans@gmail.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -72,7 +72,7 @@ #include "gstrtpbin-marshal.h" #include "gstrtpjitterbuffer.h" -#include "async_jitter_queue.h" +#include "rtpjitterbuffer.h" GST_DEBUG_CATEGORY (rtpjitterbuffer_debug); #define GST_CAT_DEFAULT (rtpjitterbuffer_debug) @@ -87,7 +87,7 @@ GST_ELEMENT_DETAILS ("RTP packet jitter-buffer", "Filter/Network/RTP", "A buffer that deals with network jitter and other transmission faults", "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, " - "Wim Taymans <wim@fluendo.com>"); + "Wim Taymans <wim.taymans@gmail.com>"); /* RTPJitterBuffer signals and args */ enum @@ -107,11 +107,32 @@ enum PROP_DROP_ON_LATENCY }; +#define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock)) + +#define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \ + JBUF_LOCK (priv); \ + if (priv->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock)) +#define JBUF_WAIT(priv) (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock)) + +#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \ + JBUF_WAIT(priv); \ + if (priv->srcresult != GST_FLOW_OK) \ + goto label; \ +} G_STMT_END + +#define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond)) + struct _GstRTPJitterBufferPrivate { GstPad *sinkpad, *srcpad; - AsyncJitterQueue *queue; + RTPJitterBuffer *jbuf; + GMutex *jbuf_lock; + GCond *jbuf_cond; /* properties */ guint latency_ms; @@ -122,12 +143,16 @@ struct _GstRTPJitterBufferPrivate /* the next expected seqnum */ guint32 next_seqnum; + /* state */ + gboolean eos; + /* clock rate and rtp timestamp offset */ gint32 clock_rate; gint64 clock_base; /* when we are shutting down */ GstFlowReturn srcresult; + gboolean blocked; /* for sync */ GstSegment segment; @@ -292,9 +317,9 @@ gst_rtp_jitter_buffer_init (GstRTPJitterBuffer * jitterbuffer, priv->latency_ms = DEFAULT_LATENCY_MS; priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY; - priv->queue = async_jitter_queue_new (); - async_jitter_queue_set_low_threshold (priv->queue, LOW_THRESHOLD); - async_jitter_queue_set_high_threshold (priv->queue, HIGH_THRESHOLD); + priv->jbuf = rtp_jitter_buffer_new (); + priv->jbuf_lock = g_mutex_new (); + priv->jbuf_cond = g_cond_new (); priv->waiting_seqnum = -1; @@ -332,9 +357,9 @@ gst_rtp_jitter_buffer_dispose (GObject * object) GstRTPJitterBuffer *jitterbuffer; jitterbuffer = GST_RTP_JITTER_BUFFER (object); - if (jitterbuffer->priv->queue) { - async_jitter_queue_unref (jitterbuffer->priv->queue); - jitterbuffer->priv->queue = NULL; + if (jitterbuffer->priv->jbuf) { + g_object_unref (jitterbuffer->priv->jbuf); + jitterbuffer->priv->jbuf = NULL; } G_OBJECT_CLASS (parent_class)->dispose (object); @@ -430,9 +455,6 @@ gst_jitter_buffer_sink_parse_caps (GstRTPJitterBuffer * jitterbuffer, } else priv->next_seqnum = -1; - async_jitter_queue_set_max_queue_length (priv->queue, - priv->latency_ms * priv->clock_rate / 1000); - return TRUE; /* ERRORS */ @@ -470,34 +492,24 @@ gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps) } static void -free_func (gpointer data, GstRTPJitterBuffer * user_data) -{ - if (GST_IS_BUFFER (data)) - gst_buffer_unref (GST_BUFFER_CAST (data)); - else - gst_event_unref (GST_EVENT_CAST (data)); -} - -static void gst_rtp_jitter_buffer_flush_start (GstRTPJitterBuffer * jitterbuffer) { GstRTPJitterBufferPrivate *priv; priv = jitterbuffer->priv; - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* mark ourselves as flushing */ priv->srcresult = GST_FLOW_WRONG_STATE; GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue"); /* this unblocks any waiting pops on the src pad task */ - async_jitter_queue_set_flushing_unlocked (jitterbuffer->priv->queue, - (GFunc) free_func, jitterbuffer); + JBUF_SIGNAL (priv); + rtp_jitter_buffer_flush (priv->jbuf); /* unlock clock, we just unschedule, the entry will be released by the * locking streaming thread. */ if (priv->clock_id) gst_clock_id_unschedule (priv->clock_id); - - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); } static void @@ -507,7 +519,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer) priv = jitterbuffer->priv; - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue"); /* Mark as non flushing */ priv->srcresult = GST_FLOW_OK; @@ -515,9 +527,8 @@ gst_rtp_jitter_buffer_flush_stop (GstRTPJitterBuffer * jitterbuffer) priv->last_popped_seqnum = -1; priv->next_seqnum = -1; priv->clock_rate = -1; - /* allow pops from the src pad task */ - async_jitter_queue_unset_flushing_unlocked (jitterbuffer->priv->queue); - async_jitter_queue_unlock (priv->queue); + priv->eos = FALSE; + JBUF_UNLOCK (priv); } static gboolean @@ -566,21 +577,20 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, case GST_STATE_CHANGE_NULL_TO_READY: break; case GST_STATE_CHANGE_READY_TO_PAUSED: - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* reset negotiated values */ priv->clock_rate = -1; priv->clock_base = -1; /* block until we go to PLAYING */ - async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, - TRUE); - async_jitter_queue_unlock (priv->queue); + priv->blocked = TRUE; + JBUF_UNLOCK (priv); break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* unblock to allow streaming in PLAYING */ - async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, - FALSE); - async_jitter_queue_unlock (priv->queue); + priv->blocked = FALSE; + JBUF_SIGNAL (priv); + JBUF_UNLOCK (priv); break; default: break; @@ -596,11 +606,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, ret = GST_STATE_CHANGE_NO_PREROLL; break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* block to stop streaming when PAUSED */ - async_jitter_queue_set_blocking_unlocked (jitterbuffer->priv->queue, - TRUE); - async_jitter_queue_unlock (priv->queue); + priv->blocked = TRUE; + JBUF_UNLOCK (priv); if (ret != GST_STATE_CHANGE_FAILURE) ret = GST_STATE_CHANGE_NO_PREROLL; break; @@ -630,30 +639,6 @@ priv_compare_rtp_seq_lt (guint16 a, guint16 b) } } -/** - * gets the seqnum from the buffers and compare them - */ -static gint -compare_rtp_buffers_seq_num (GstBuffer * a, GstBuffer * b) -{ - gint ret; - - if (GST_IS_BUFFER (a) && GST_IS_BUFFER (b)) { - /* two buffers */ - ret = priv_compare_rtp_seq_lt - (gst_rtp_buffer_get_seq (GST_BUFFER_CAST (a)), - gst_rtp_buffer_get_seq (GST_BUFFER_CAST (b))); - } else { - /* one of them is an event, the event always goes before the other element - * so we return -1. */ - if (GST_IS_EVENT (a)) - ret = -1; - else - ret = 1; - } - return ret; -} - static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event) { @@ -707,16 +692,20 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event) case GST_EVENT_EOS: { /* push EOS in queue. We always push it at the head */ - async_jitter_queue_lock (priv->queue); - GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS"); + JBUF_LOCK (priv); /* check for flushing, we need to discard the event and return FALSE when * we are flushing */ ret = priv->srcresult == GST_FLOW_OK; - if (ret) - async_jitter_queue_push_unlocked (priv->queue, event); - else + if (ret) { + GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS"); + priv->eos = TRUE; + JBUF_SIGNAL (priv); + } else { + GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s", + gst_flow_get_name (priv->srcresult)); gst_event_unref (event); - async_jitter_queue_unlock (priv->queue); + } + JBUF_UNLOCK (priv); break; } default: @@ -780,7 +769,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) GstRTPJitterBuffer *jitterbuffer; GstRTPJitterBufferPrivate *priv; guint16 seqnum; - GstFlowReturn ret; + GstFlowReturn ret = GST_FLOW_OK; jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad)); @@ -803,10 +792,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) seqnum = gst_rtp_buffer_get_seq (buffer); GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum); - async_jitter_queue_lock (priv->queue); - ret = priv->srcresult; - if (ret != GST_FLOW_OK) - goto out_flushing; + JBUF_LOCK_CHECK (priv, out_flushing); + /* don't accept more data on EOS */ + if (priv->eos) + goto have_eos; /* let's check if this buffer is too late, we cannot accept packets with * bigger seqnum than the one we already pushed. */ @@ -818,14 +807,18 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) /* let's drop oldest packet if the queue is already full and drop-on-latency * is set. */ if (priv->drop_on_latency) { - if (async_jitter_queue_length_ts_units_unlocked (priv->queue) >= - priv->latency_ms * priv->clock_rate / 1000) { + guint64 latency_ts; + + latency_ts = + gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000); + + if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) { GstBuffer *old_buf; GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d", seqnum); - old_buf = async_jitter_queue_pop_unlocked (priv->queue); + old_buf = rtp_jitter_buffer_pop (priv->jbuf); gst_buffer_unref (old_buf); } } @@ -833,10 +826,12 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) /* now insert the packet into the queue in sorted order. This function returns * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ - if (!async_jitter_queue_push_sorted_unlocked (priv->queue, buffer, - (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) + if (!rtp_jitter_buffer_insert (priv->jbuf, buffer)) goto duplicate; + /* signal addition of new buffer */ + JBUF_SIGNAL (priv); + /* let's unschedule and unblock any waiting buffers. We only want to do this * if there is a currently waiting newer (> seqnum) buffer */ if (priv->clock_id) { @@ -846,11 +841,11 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) } } - GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d on queue %d", - seqnum, async_jitter_queue_length_unlocked (priv->queue)); + GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets", + seqnum, rtp_jitter_buffer_num_packets (priv->jbuf)); finished: - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); gst_object_unref (jitterbuffer); @@ -875,10 +870,18 @@ not_negotiated: } out_flushing: { + ret = priv->srcresult; GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret)); gst_buffer_unref (buffer); goto finished; } +have_eos: + { + ret = GST_FLOW_UNEXPECTED; + GST_DEBUG_OBJECT (jitterbuffer, "we are EOS, refusing buffer"); + gst_buffer_unref (buffer); + goto finished; + } too_late: { GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already" @@ -908,8 +911,7 @@ static void gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer) { GstRTPJitterBufferPrivate *priv; - gpointer elem; - GstBuffer *outbuf; + GstBuffer *outbuf = NULL; GstFlowReturn result; guint16 seqnum; guint32 rtp_time; @@ -918,44 +920,24 @@ gst_rtp_jitter_buffer_loop (GstRTPJitterBuffer * jitterbuffer) priv = jitterbuffer->priv; - async_jitter_queue_lock (priv->queue); + JBUF_LOCK_CHECK (priv, flushing); again: GST_DEBUG_OBJECT (jitterbuffer, "Popping item"); - /* pop a buffer, we will get NULL if the queue was shut down */ - elem = async_jitter_queue_pop_unlocked (priv->queue); - if (!elem) - goto no_elem; - - /* special code for events */ - if (G_UNLIKELY (GST_IS_EVENT (elem))) { - GstEvent *event = GST_EVENT_CAST (elem); - - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_EOS: - GST_DEBUG_OBJECT (jitterbuffer, "Popped EOS from queue"); - /* we don't expect more data now, makes upstream perform EOS actions */ - priv->srcresult = GST_FLOW_UNEXPECTED; - break; - default: - GST_DEBUG_OBJECT (jitterbuffer, "Popped event %s from queue", - GST_EVENT_TYPE_NAME (event)); - break; - } - async_jitter_queue_unlock (priv->queue); - - /* push event */ - gst_pad_push_event (priv->srcpad, event); - return; + /* wait if we are blocked or don't have a packet and eos */ + while (priv->blocked || !(rtp_jitter_buffer_num_packets (priv->jbuf) + || priv->eos)) { + JBUF_WAIT_CHECK (priv, flushing); } + if (priv->eos) + goto do_eos; - /* we know it's a buffer now */ - outbuf = GST_BUFFER_CAST (elem); + /* pop a buffer, we must have a buffer now */ + outbuf = rtp_jitter_buffer_pop (priv->jbuf); seqnum = gst_rtp_buffer_get_seq (outbuf); - GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d from queue %d", - gst_rtp_buffer_get_seq (outbuf), - async_jitter_queue_length_unlocked (priv->queue)); + GST_DEBUG_OBJECT (jitterbuffer, "Popped buffer #%d, now %d left", + seqnum, rtp_jitter_buffer_num_packets (priv->jbuf)); /* If we don't know what the next seqnum should be (== -1) we have to wait * because it might be possible that we are not receiving this buffer in-order, @@ -1032,11 +1014,11 @@ again: GST_OBJECT_UNLOCK (jitterbuffer); /* release the lock so that the other end can push stuff or unlock */ - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); ret = gst_clock_id_wait (id, &jitter); - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* and free the entry */ gst_clock_id_unref (id); priv->clock_id = NULL; @@ -1054,8 +1036,7 @@ again: GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled, will retry to push with new buffer"); /* reinserting popped buffer into queue */ - if (!async_jitter_queue_push_sorted_unlocked (priv->queue, outbuf, - (GCompareDataFunc) compare_rtp_buffers_seq_num, NULL)) { + if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) { GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", seqnum); priv->num_duplicates++; @@ -1087,7 +1068,7 @@ push_buffer: * so the other end can push stuff in the queue again. */ priv->last_popped_seqnum = seqnum; priv->next_seqnum = (seqnum + 1) & 0xffff; - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); /* push buffer */ GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum); @@ -1098,20 +1079,22 @@ push_buffer: return; /* ERRORS */ -no_elem: +do_eos: { /* store result, we are flushing now */ - GST_DEBUG_OBJECT (jitterbuffer, "Pop returned NULL, we're flushing"); - priv->srcresult = GST_FLOW_WRONG_STATE; + GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream"); + priv->srcresult = GST_FLOW_UNEXPECTED; gst_pad_pause_task (priv->srcpad); - async_jitter_queue_unlock (priv->queue); + gst_pad_push_event (priv->srcpad, gst_event_new_eos ()); + JBUF_UNLOCK (priv); return; } flushing: { GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); - gst_buffer_unref (outbuf); - async_jitter_queue_unlock (priv->queue); + if (outbuf) + gst_buffer_unref (outbuf); + JBUF_UNLOCK (priv); return; } pause: @@ -1120,13 +1103,13 @@ pause: GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason); - async_jitter_queue_lock (priv->queue); + JBUF_LOCK (priv); /* store result */ priv->srcresult = result; /* we don't post errors or anything because upstream will do that for us * when we pass the return value upstream. */ gst_pad_pause_task (priv->srcpad); - async_jitter_queue_unlock (priv->queue); + JBUF_UNLOCK (priv); return; } } @@ -1194,11 +1177,7 @@ gst_rtp_jitter_buffer_set_property (GObject * object, old_latency = jitterbuffer->priv->latency_ms; jitterbuffer->priv->latency_ms = new_latency; - if (jitterbuffer->priv->clock_rate != -1) { - async_jitter_queue_set_max_queue_length (jitterbuffer->priv->queue, - gst_util_uint64_scale_int (new_latency, - jitterbuffer->priv->clock_rate, 1000)); - } + /* post message if latency changed, this will infor the parent pipeline * that a latency reconfiguration is possible. */ if (new_latency != old_latency) { |