From 918c9448f2513543885d8e6effec5822b06c971e Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Fri, 27 Mar 2009 17:44:57 +0100 Subject: rtpbin: add on_npt_stop signal Add the on_npt_stop signal to rtpbin and rtpjitterbuffer to notify the application that the NPT stop position has been reached. --- gst/rtpmanager/gstrtpbin.c | 23 +++++ gst/rtpmanager/gstrtpbin.h | 1 + gst/rtpmanager/gstrtpjitterbuffer.c | 179 +++++++++++++++++++++++++++++++++--- gst/rtpmanager/gstrtpjitterbuffer.h | 5 +- 4 files changed, 194 insertions(+), 14 deletions(-) (limited to 'gst/rtpmanager') diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index a845bbbd..4322ee00 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -233,6 +233,7 @@ enum SIGNAL_ON_BYE_TIMEOUT, SIGNAL_ON_TIMEOUT, SIGNAL_ON_SENDER_TIMEOUT, + SIGNAL_ON_NPT_STOP, LAST_SIGNAL }; @@ -459,6 +460,13 @@ on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess) sess->id, ssrc); } +static void +on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream) +{ + g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0, + stream->session->id, stream->ssrc); +} + /* create a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * create_session (GstRtpBin * rtpbin, gint id) @@ -1091,6 +1099,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc) /* provide clock_rate to the jitterbuffer when needed */ g_signal_connect (buffer, "request-pt-map", (GCallback) pt_map_requested, session); + g_signal_connect (buffer, "on-npt-stop", (GCallback) on_npt_stop, stream); /* configure latency and packet lost */ g_object_set (buffer, "latency", session->bin->latency, NULL); @@ -1375,6 +1384,20 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_UINT); + /** + * GstRtpBin::on-npt-stop: + * @rtpbin: the object which received the signal + * @session: the session + * @ssrc: the SSRC + * + * Notify that SSRC sender has sent data up to the configured NPT stop time. + */ + gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP] = + g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpBinClass, on_npt_stop), + NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_UINT, G_TYPE_NONE, 2, + G_TYPE_UINT, G_TYPE_UINT); + g_object_class_install_property (gobject_class, PROP_SDES_CNAME, g_param_spec_string ("sdes-cname", "SDES CNAME", "The CNAME to put in SDES messages of this session", diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index e7658f54..a984d4da 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -82,6 +82,7 @@ struct _GstRtpBinClass { void (*on_bye_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc); void (*on_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc); void (*on_sender_timeout) (GstRtpBin *rtpbin, guint session, guint32 ssrc); + void (*on_npt_stop) (GstRtpBin *rtpbin, guint session, guint32 ssrc); }; GType gst_rtp_bin_get_type (void); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 2d3d4458..0fa23959 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -88,6 +88,7 @@ enum SIGNAL_REQUEST_PT_MAP, SIGNAL_CLEAR_PT_MAP, SIGNAL_HANDLE_SYNC, + SIGNAL_ON_NPT_STOP, LAST_SIGNAL }; @@ -151,6 +152,15 @@ struct _GstRtpJitterBufferPrivate /* the next expected seqnum we receive */ guint32 next_in_seqnum; + /* start and stop ranges */ + GstClockTime npt_start; + GstClockTime npt_stop; + guint64 ext_timestamp; + guint64 last_elapsed; + guint64 estimated_eos; + GstClockID eos_id; + gboolean reached_npt_stop; + /* state */ gboolean eos; @@ -355,6 +365,19 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED, G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE); + /** + * GstRtpJitterBuffer::on-npt-stop + * @buffer: the object which received the signal + * + * Signal that the jitterbufer has pushed the RTP packet that corresponds to + * the npt-stop position. + */ + gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] = + g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass, + on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID, + G_TYPE_NONE, 0, G_TYPE_NONE); + /** * GstRtpJitterBuffer::clear-pt-map: * @buffer: the object which received the signal @@ -629,6 +652,7 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, GstRtpJitterBufferPrivate *priv; GstStructure *caps_struct; guint val; + GstClockTime tval; priv = jitterbuffer->priv; @@ -647,14 +671,15 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate); - /* gah, clock-base is uint. If we don't have a base, we will use the first - * buffer timestamp as the base time. This will screw up sync but it's better - * than nothing. */ + /* The clock base is the RTP timestamp corrsponding to the npt-start value. We + * can use this to track the amount of time elapsed on the sender. */ if (gst_structure_get_uint (caps_struct, "clock-base", &val)) priv->clock_base = val; else priv->clock_base = -1; + priv->ext_timestamp = priv->clock_base; + GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT, priv->clock_base); @@ -668,6 +693,23 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum); + /* the start and stop times. The seqnum-base corresponds to the start time. We + * will keep track of the seqnums on the output and when we reach the one + * corresponding to npt-stop, we emit the npt-stop-reached signal */ + if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval)) + priv->npt_start = tval; + else + priv->npt_start = 0; + + if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval)) + priv->npt_stop = tval; + else + priv->npt_stop = -1; + + GST_DEBUG_OBJECT (jitterbuffer, + "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT, + GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop)); + return TRUE; /* ERRORS */ @@ -800,6 +842,10 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, /* reset negotiated values */ priv->clock_rate = -1; priv->clock_base = -1; + priv->last_elapsed = 0; + priv->estimated_eos = -1; + priv->reached_npt_stop = FALSE; + priv->ext_timestamp = -1; priv->peer_latency = 0; priv->last_pt = -1; /* block until we go to PLAYING */ @@ -1079,6 +1125,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer) timestamp); seqnum = gst_rtp_buffer_get_seq (buffer); + GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d at time %" GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timestamp)); @@ -1254,6 +1301,48 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp) return timestamp; } +static GstClockTime +get_sync_time (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp) +{ + GstClockTime result; + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + result = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time; + /* add latency, this includes our own latency and the peer latency. */ + result += (priv->latency_ms * GST_MSECOND); + result += priv->peer_latency; + + return result; +} + +static gboolean +eos_reached (GstClock * clock, GstClockTime time, GstClockID id, + GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + JBUF_LOCK_CHECK (priv, flushing); + if (priv->waiting) { + GST_DEBUG_OBJECT (jitterbuffer, "got the NPT timeout"); + priv->reached_npt_stop = TRUE; + JBUF_SIGNAL (priv); + } + JBUF_UNLOCK (priv); + + return TRUE; + + /* ERRORS */ +flushing: + { + JBUF_UNLOCK (priv); + return FALSE; + } +} + /** * This funcion will push out buffers on the source pad. * @@ -1272,6 +1361,9 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) GstClockTime timestamp, out_time; gboolean discont = FALSE; gint gap; + GstClock *clock; + GstClockID id; + GstClockTime sync_time; priv = jitterbuffer->priv; @@ -1279,6 +1371,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) again: GST_DEBUG_OBJECT (jitterbuffer, "Peeking item"); while (TRUE) { + id = NULL; /* always wait if we are blocked */ if (G_LIKELY (!priv->blocked)) { /* if we have a packet, we can exit the loop and grab it */ @@ -1287,11 +1380,38 @@ again: /* no packets but we are EOS, do eos logic */ if (G_UNLIKELY (priv->eos)) goto do_eos; + /* underrun, wait for packets or flushing now if we are expecting an EOS + * timeout, set the async timer for it too */ + if (priv->estimated_eos != -1 && !priv->reached_npt_stop) { + sync_time = get_sync_time (jitterbuffer, priv->estimated_eos); + + GST_OBJECT_LOCK (jitterbuffer); + clock = GST_ELEMENT_CLOCK (jitterbuffer); + if (clock) { + GST_DEBUG_OBJECT (jitterbuffer, "scheduling timeout"); + id = gst_clock_new_single_shot_id (clock, sync_time); + gst_clock_id_wait_async (id, (GstClockCallback) eos_reached, + jitterbuffer); + } + GST_OBJECT_UNLOCK (jitterbuffer); + } } - /* underrun, wait for packets or flushing now */ + /* now we wait */ priv->waiting = TRUE; - JBUF_WAIT_CHECK (priv, flushing); + JBUF_WAIT (priv); priv->waiting = FALSE; + + if (id) { + /* unschedule any pending async notifications we might have */ + gst_clock_id_unschedule (id); + gst_clock_id_unref (id); + } + if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) + goto flushing; + + if (id && priv->reached_npt_stop) { + goto do_npt_stop; + } } /* peek a buffer, we're just looking at the timestamp and the sequence number. @@ -1346,10 +1466,7 @@ again: * must be before this packet) we can wait for it until the deadline for this * packet expires. */ if (G_UNLIKELY (gap != 0 && out_time != -1)) { - GstClockID id; - GstClockTime sync_time; GstClockReturn ret; - GstClock *clock; GstClockTime duration = GST_CLOCK_TIME_NONE; if (gap > 0) { @@ -1395,10 +1512,7 @@ again: GST_TIME_ARGS (out_time)); /* prepare for sync against clock */ - sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time; - /* add latency, this includes our own latency and the peer latency. */ - sync_time += (priv->latency_ms * GST_MSECOND); - sync_time += priv->peer_latency; + sync_time = get_sync_time (jitterbuffer, out_time); /* create an entry for the clock */ id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); @@ -1484,6 +1598,37 @@ push_buffer: /* apply timestamp with offset to buffer now */ GST_BUFFER_TIMESTAMP (outbuf) = out_time; + /* update the elapsed time when we need to check against the npt stop time. */ + if (priv->npt_stop != -1 && priv->ext_timestamp != -1 + && priv->clock_base != -1) { + guint64 ext_time, elapsed, estimated; + guint32 rtp_time; + + rtp_time = gst_rtp_buffer_get_timestamp (outbuf); + + ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time); + if (ext_time > priv->clock_base) + elapsed = ext_time - priv->clock_base; + else + elapsed = 0; + + elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate); + + if (elapsed > priv->last_elapsed) { + priv->last_elapsed = elapsed; + + if (elapsed > 0) + estimated = gst_util_uint64_scale (out_time, priv->npt_stop, elapsed); + else + estimated = -1; + + GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %" + GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated)); + + priv->estimated_eos = estimated; + } + } + /* now we are ready to push the buffer. Save the seqnum and release the lock * so the other end can push stuff in the queue again. */ priv->last_popped_seqnum = seqnum; @@ -1512,6 +1657,16 @@ do_eos: JBUF_UNLOCK (priv); return; } +do_npt_stop: + { + /* store result, we are flushing now */ + GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop"); + JBUF_UNLOCK (priv); + + g_signal_emit (jitterbuffer, + gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL); + return; + } flushing: { GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h index 45e68979..6d7610e5 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.h +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -69,9 +69,10 @@ struct _GstRtpJitterBufferClass GstElementClass parent_class; /* signals */ - GstCaps* (*request_pt_map) (GstRtpJitterBuffer *buffer, guint pt); + GstCaps* (*request_pt_map) (GstRtpJitterBuffer *buffer, guint pt); - void (*handle_sync) (GstRtpJitterBuffer *buffer, GstStructure *s); + void (*handle_sync) (GstRtpJitterBuffer *buffer, GstStructure *s); + void (*on_npt_stop) (GstRtpJitterBuffer *buffer); /* actions */ void (*clear_pt_map) (GstRtpJitterBuffer *buffer); -- cgit