diff options
| author | Wim Taymans <wim.taymans@gmail.com> | 2007-04-18 18:58:53 +0000 | 
|---|---|---|
| committer | Tim-Philipp Müller <tim.muller@collabora.co.uk> | 2009-08-11 02:30:25 +0100 | 
| commit | 54b3dec1f53c823bd947685bf89c4d0e041c2e2a (patch) | |
| tree | ebd2d51a540408659367828cd2548cbc498515b2 | |
| parent | 490113d40db4fc3c291501941a06b3846ace1bb2 (diff) | |
configure.ac: Disable rtpmanager for now because it depends on CVS -base.
Original commit message from CVS:
* configure.ac:
Disable rtpmanager for now because it depends on CVS -base.
* gst/rtpmanager/Makefile.am:
Added new files for session manager.
* gst/rtpmanager/gstrtpjitterbuffer.h:
* gst/rtpmanager/gstrtpbin.c: (create_session), (get_pt_map),
(create_stream), (pt_map_requested), (new_ssrc_pad_found):
Some cleanups.
the session manager can now also request a pt-map.
* gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_base_init),
(gst_rtp_session_class_init), (gst_rtp_session_init),
(gst_rtp_session_finalize), (rtcp_thread), (start_rtcp_thread),
(stop_rtcp_thread), (gst_rtp_session_change_state),
(gst_rtp_session_process_rtp), (gst_rtp_session_send_rtp),
(gst_rtp_session_send_rtcp), (gst_rtp_session_clock_rate),
(gst_rtp_session_get_time), (gst_rtp_session_event_recv_rtp_sink),
(gst_rtp_session_chain_recv_rtp),
(gst_rtp_session_event_recv_rtcp_sink),
(gst_rtp_session_chain_recv_rtcp),
(gst_rtp_session_event_send_rtp_sink),
(gst_rtp_session_chain_send_rtp), (create_send_rtcp_src),
(gst_rtp_session_request_new_pad):
* gst/rtpmanager/gstrtpsession.h:
We can ask for pt-map now too when the session manager needs it.
Hook up to the new session manager, implement the needed callbacks for
pushing data, getting clock time and requesting clock-rates.
Rename rtcp_src to send_rtcp_src to make it clear that this RTCP is to
be send to clients.
Add code to start and stop the thread that will schedule RTCP through
the session manager.
* gst/rtpmanager/rtpsession.c: (rtp_session_class_init),
(rtp_session_init), (rtp_session_finalize),
(rtp_session_set_property), (rtp_session_get_property),
(on_new_ssrc), (on_ssrc_collision), (on_ssrc_validated),
(on_bye_ssrc), (rtp_session_new), (rtp_session_set_callbacks),
(rtp_session_set_bandwidth), (rtp_session_get_bandwidth),
(rtp_session_set_rtcp_bandwidth), (rtp_session_get_rtcp_bandwidth),
(source_push_rtp), (source_clock_rate), (check_collision),
(obtain_source), (rtp_session_add_source),
(rtp_session_get_num_sources),
(rtp_session_get_num_active_sources),
(rtp_session_get_source_by_ssrc),
(rtp_session_get_source_by_cname), (rtp_session_create_source),
(update_arrival_stats), (rtp_session_process_rtp),
(rtp_session_process_sr), (rtp_session_process_rr),
(rtp_session_process_sdes), (rtp_session_process_bye),
(rtp_session_process_app), (rtp_session_process_rtcp),
(rtp_session_send_rtp), (rtp_session_get_rtcp_interval),
(rtp_session_produce_rtcp):
* gst/rtpmanager/rtpsession.h:
The advanced beginnings of the main session manager that handles the
participant database of RTPSources, SSRC probation, SSRC collisions,
parse RTCP to update source stats. etc..
* gst/rtpmanager/rtpsource.c: (rtp_source_class_init),
(rtp_source_init), (rtp_source_finalize), (rtp_source_new),
(rtp_source_set_callbacks), (rtp_source_set_as_csrc),
(rtp_source_set_rtp_from), (rtp_source_set_rtcp_from),
(push_packet), (get_clock_rate), (calculate_jitter),
(rtp_source_process_rtp), (rtp_source_process_bye),
(rtp_source_send_rtp), (rtp_source_process_sr),
(rtp_source_process_rb):
* gst/rtpmanager/rtpsource.h:
Object that encapsulates an SSRC and its state in the database.
Calculates the jitter and transit times of data packets.
* gst/rtpmanager/rtpstats.c: (rtp_stats_init_defaults),
(rtp_stats_calculate_rtcp_interval), (rtp_stats_add_rtcp_jitter):
* gst/rtpmanager/rtpstats.h:
Various stats regarding the session and sources.
Used to calculate the RTCP interval.
| -rw-r--r-- | gst/rtpmanager/Makefile.am | 9 | ||||
| -rw-r--r-- | gst/rtpmanager/gstrtpbin.c | 28 | ||||
| -rw-r--r-- | gst/rtpmanager/gstrtpjitterbuffer.h | 1 | ||||
| -rw-r--r-- | gst/rtpmanager/gstrtpsession.c | 325 | ||||
| -rw-r--r-- | gst/rtpmanager/gstrtpsession.h | 6 | ||||
| -rw-r--r-- | gst/rtpmanager/rtpsession.c | 1026 | ||||
| -rw-r--r-- | gst/rtpmanager/rtpsession.h | 206 | ||||
| -rw-r--r-- | gst/rtpmanager/rtpsource.c | 477 | ||||
| -rw-r--r-- | gst/rtpmanager/rtpsource.h | 162 | ||||
| -rw-r--r-- | gst/rtpmanager/rtpstats.c | 111 | ||||
| -rw-r--r-- | gst/rtpmanager/rtpstats.h | 161 | 
11 files changed, 2477 insertions, 35 deletions
diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am index f844e47c..9e47cbdf 100644 --- a/gst/rtpmanager/Makefile.am +++ b/gst/rtpmanager/Makefile.am @@ -17,6 +17,9 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \  			      gstrtpjitterbuffer.c \  			      gstrtpptdemux.c \  			      gstrtpssrcdemux.c \ +			      rtpsession.c      \ +			      rtpsource.c      \ +			      rtpstats.c      \  			      gstrtpsession.c  nodist_libgstrtpmanager_la_SOURCES = \ @@ -28,11 +31,15 @@ noinst_HEADERS = gstrtpbin.h \  		 gstrtpjitterbuffer.h \                   gstrtpptdemux.h \                   gstrtpssrcdemux.h \ +		 rtpsession.h  \ +		 rtpsource.h  \ +		 rtpstats.h  \  		 gstrtpsession.h  libgstrtpmanager_la_CFLAGS = $(GST_CFLAGS) $(GST_PLUGINS_BASE_CFLAGS) $(ERROR_CFLAGS)  libgstrtpmanager_la_LIBADD = $(GST_LIBS_LIBS) -libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ +libgstrtpmanager_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) $(GST_PLUGINS_BASE_LIBS) -lgstrtp-@GST_MAJORMINOR@ \ +			      -lgstnetbuffer-@GST_MAJORMINOR@  CLEANFILES = $(BUILT_SOURCES) diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 6825e9cc..9162d76c 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -129,7 +129,7 @@ typedef struct _GstRTPBinClient GstRTPBinClient;  static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };  static GstCaps *pt_map_requested (GstElement * element, guint pt, -    GstRTPBinStream * stream); +    GstRTPBinSession * session);  /* Manages the RTP stream for one SSRC.   * @@ -215,9 +215,9 @@ static GstRTPBinSession *  create_session (GstRTPBin * rtpbin, gint id)  {    GstRTPBinSession *sess; -  GstElement *elem, *demux; +  GstElement *session, *demux; -  if (!(elem = gst_element_factory_make ("rtpsession", NULL))) +  if (!(session = gst_element_factory_make ("rtpsession", NULL)))      goto no_session;    if (!(demux = gst_element_factory_make ("rtpssrcdemux", NULL))) @@ -227,13 +227,17 @@ create_session (GstRTPBin * rtpbin, gint id)    sess->lock = g_mutex_new ();    sess->id = id;    sess->bin = rtpbin; -  sess->session = elem; +  sess->session = session;    sess->demux = demux;    sess->ptmap = g_hash_table_new (NULL, NULL);    rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess); -  gst_bin_add (GST_BIN_CAST (rtpbin), elem); -  gst_element_set_state (elem, GST_STATE_PLAYING); +  /* provide clock_rate to the session manager when needed */ +  g_signal_connect (session, "request-pt-map", +      (GCallback) pt_map_requested, sess); + +  gst_bin_add (GST_BIN_CAST (rtpbin), session); +  gst_element_set_state (session, GST_STATE_PLAYING);    gst_bin_add (GST_BIN_CAST (rtpbin), demux);    gst_element_set_state (demux, GST_STATE_PLAYING); @@ -247,7 +251,7 @@ no_session:    }  no_demux:    { -    gst_object_unref (elem); +    gst_object_unref (session);      g_warning ("rtpbin: could not create rtpssrcdemux element");      return NULL;    } @@ -351,7 +355,7 @@ create_stream (GstRTPBinSession * session, guint32 ssrc)    /* provide clock_rate to the jitterbuffer when needed */    g_signal_connect (buffer, "request-pt-map", -      (GCallback) pt_map_requested, stream); +      (GCallback) pt_map_requested, session);    gst_bin_add (GST_BIN_CAST (session->bin), buffer);    gst_element_set_state (buffer, GST_STATE_PLAYING); @@ -590,14 +594,12 @@ new_payload_found (GstElement * element, guint pt, GstPad * pad,  }  static GstCaps * -pt_map_requested (GstElement * element, guint pt, GstRTPBinStream * stream) +pt_map_requested (GstElement * element, guint pt, GstRTPBinSession * session)  {    GstRTPBin *rtpbin; -  GstRTPBinSession *session;    GstCaps *caps; -  rtpbin = stream->bin; -  session = stream->session; +  rtpbin = session->bin;    GST_DEBUG_OBJECT (rtpbin, "payload map requested for pt %d in session %d", pt,        session->id); @@ -647,7 +649,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,     * demuxer so that it can apply a proper caps on the buffers for the     * depayloaders. */    stream->demux_ptreq_sig = g_signal_connect (stream->demux, -      "request-pt-map", (GCallback) pt_map_requested, stream); +      "request-pt-map", (GCallback) pt_map_requested, session);    GST_RTP_SESSION_UNLOCK (session); diff --git a/gst/rtpmanager/gstrtpjitterbuffer.h b/gst/rtpmanager/gstrtpjitterbuffer.h index e101039a..3cbcd62f 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.h +++ b/gst/rtpmanager/gstrtpjitterbuffer.h @@ -63,6 +63,7 @@ struct _GstRTPJitterBufferClass  {    GstElementClass parent_class; +  /* signals */    GstCaps* (*request_pt_map) (GstRTPJitterBuffer *buffer, guint pt);    /*< private > */ diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index cdad7e9b..03b0802b 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -39,7 +39,10 @@  #ifdef HAVE_CONFIG_H  #include "config.h"  #endif + +#include "gstrtpbin-marshal.h"  #include "gstrtpsession.h" +#include "rtpsession.h"  GST_DEBUG_CATEGORY_STATIC (gst_rtp_session_debug);  #define GST_CAT_DEFAULT gst_rtp_session_debug @@ -95,8 +98,8 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src",      GST_STATIC_CAPS ("application/x-rtp")      ); -static GstStaticPadTemplate rtpsession_rtcp_src_template = -GST_STATIC_PAD_TEMPLATE ("rtcp_src", +static GstStaticPadTemplate rtpsession_send_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("send_rtcp_src",      GST_PAD_SRC,      GST_PAD_REQUEST,      GST_STATIC_CAPS ("application/x-rtcp") @@ -105,7 +108,7 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src",  /* signals and args */  enum  { -  /* FILL ME */ +  SIGNAL_REQUEST_PT_MAP,    LAST_SIGNAL  }; @@ -123,6 +126,31 @@ enum  struct _GstRTPSessionPrivate  {    GMutex *lock; +  RTPSession *session; +  /* thread for sending out RTCP */ +  GstClockID id; +  gboolean stop_thread; +  GThread *thread; +}; + +/* callbacks to handle actions from the session manager */ +static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, +    RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, +    RTPSource * src, GstBuffer * buffer, gpointer user_data); +static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, +    RTPSource * src, GstBuffer * buffer, gpointer user_data); +static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, +    gpointer user_data); +static GstClockTime gst_rtp_session_get_time (RTPSession * sess, +    gpointer user_data); + +static RTPSessionCallbacks callbacks = { +  gst_rtp_session_process_rtp, +  gst_rtp_session_send_rtp, +  gst_rtp_session_send_rtcp, +  gst_rtp_session_clock_rate, +  gst_rtp_session_get_time  };  /* GObject vmethods */ @@ -139,7 +167,7 @@ static GstPad *gst_rtp_session_request_new_pad (GstElement * element,      GstPadTemplate * templ, const gchar * name);  static void gst_rtp_session_release_pad (GstElement * element, GstPad * pad); -/*static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 }; */ +static guint gst_rtp_session_signals[LAST_SIGNAL] = { 0 };  GST_BOILERPLATE (GstRTPSession, gst_rtp_session, GstElement, GST_TYPE_ELEMENT); @@ -164,7 +192,7 @@ gst_rtp_session_base_init (gpointer klass)    gst_element_class_add_pad_template (element_class,        gst_static_pad_template_get (&rtpsession_send_rtp_src_template));    gst_element_class_add_pad_template (element_class, -      gst_static_pad_template_get (&rtpsession_rtcp_src_template)); +      gst_static_pad_template_get (&rtpsession_send_rtcp_src_template));    gst_element_class_set_details (element_class, &rtpsession_details);  } @@ -184,6 +212,19 @@ gst_rtp_session_class_init (GstRTPSessionClass * klass)    gobject_class->set_property = gst_rtp_session_set_property;    gobject_class->get_property = gst_rtp_session_get_property; +  /** +   * GstRTPSession::request-pt-map: +   * @sess: the object which received the signal +   * @pt: the pt +   * +   * Request the payload type as #GstCaps for @pt. +   */ +  gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP] = +      g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass), +      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRTPSessionClass, request_pt_map), +      NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT, GST_TYPE_CAPS, 1, +      G_TYPE_UINT); +    gstelement_class->change_state =        GST_DEBUG_FUNCPTR (gst_rtp_session_change_state);    gstelement_class->request_new_pad = @@ -200,6 +241,9 @@ gst_rtp_session_init (GstRTPSession * rtpsession, GstRTPSessionClass * klass)  {    rtpsession->priv = GST_RTP_SESSION_GET_PRIVATE (rtpsession);    rtpsession->priv->lock = g_mutex_new (); +  rtpsession->priv->session = rtp_session_new (); +  /* configure callbacks */ +  rtp_session_set_callbacks (rtpsession->priv->session, &callbacks, rtpsession);  }  static void @@ -209,6 +253,7 @@ gst_rtp_session_finalize (GObject * object)    rtpsession = GST_RTP_SESSION (object);    g_mutex_free (rtpsession->priv->lock); +  g_object_unref (rtpsession->priv->session);    G_OBJECT_CLASS (parent_class)->finalize (object);  } @@ -243,6 +288,87 @@ gst_rtp_session_get_property (GObject * object, guint prop_id,    }  } +static void +rtcp_thread (GstRTPSession * rtpsession) +{ +  GstClock *clock; +  GstClockID id; + +  clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); +  if (clock == NULL) +    return; + +  GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); + +  GST_RTP_SESSION_LOCK (rtpsession); +  while (!rtpsession->priv->stop_thread) { +    gdouble timeout; +    GstClockTime target; + +    timeout = rtp_session_get_rtcp_interval (rtpsession->priv->session); +    GST_DEBUG_OBJECT (rtpsession, "next RTCP timeout: %lf", timeout); + +    target = gst_clock_get_time (clock); +    target += GST_SECOND * timeout; +    id = rtpsession->priv->id = gst_clock_new_single_shot_id (clock, target); +    GST_RTP_SESSION_UNLOCK (rtpsession); + +    gst_clock_id_wait (id, NULL); + +    GST_DEBUG_OBJECT (rtpsession, "got RTCP timeout"); + +    /* make the session manager produce RTCP, we ignore the result. */ +    rtp_session_produce_rtcp (rtpsession->priv->session); + +    GST_RTP_SESSION_LOCK (rtpsession); +    gst_clock_id_unref (id); +    rtpsession->priv->id = NULL; +  } +  GST_RTP_SESSION_UNLOCK (rtpsession); + +  gst_object_unref (clock); + +  GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); +} + +static gboolean +start_rtcp_thread (GstRTPSession * rtpsession) +{ +  GError *error = NULL; +  gboolean res; + +  GST_DEBUG_OBJECT (rtpsession, "starting RTCP thread"); + +  GST_RTP_SESSION_LOCK (rtpsession); +  rtpsession->priv->stop_thread = FALSE; +  rtpsession->priv->thread = +      g_thread_create ((GThreadFunc) rtcp_thread, rtpsession, TRUE, &error); +  GST_RTP_SESSION_UNLOCK (rtpsession); + +  if (error != NULL) { +    res = FALSE; +    GST_DEBUG_OBJECT (rtpsession, "failed to start thread, %s", error->message); +    g_error_free (error); +  } else { +    res = TRUE; +  } +  return res; +} + +static void +stop_rtcp_thread (GstRTPSession * rtpsession) +{ +  GST_DEBUG_OBJECT (rtpsession, "stopping RTCP thread"); + +  GST_RTP_SESSION_LOCK (rtpsession); +  rtpsession->priv->stop_thread = TRUE; +  if (rtpsession->priv->id) +    gst_clock_id_unschedule (rtpsession->priv->id); +  GST_RTP_SESSION_UNLOCK (rtpsession); + +  g_thread_join (rtpsession->priv->thread); +} +  static GstStateChangeReturn  gst_rtp_session_change_state (GstElement * element, GstStateChange transition)  { @@ -258,6 +384,8 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)        break;      case GST_STATE_CHANGE_PAUSED_TO_PLAYING:        break; +    case GST_STATE_CHANGE_PLAYING_TO_PAUSED: +      stop_rtcp_thread (rtpsession);      default:        break;    } @@ -265,6 +393,10 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)    res = parent_class->change_state (element, transition);    switch (transition) { +    case GST_STATE_CHANGE_PAUSED_TO_PLAYING: +      if (!start_rtcp_thread (rtpsession)) +        goto failed_thread; +      break;      case GST_STATE_CHANGE_PLAYING_TO_PAUSED:        break;      case GST_STATE_CHANGE_PAUSED_TO_READY: @@ -275,15 +407,158 @@ gst_rtp_session_change_state (GstElement * element, GstStateChange transition)        break;    }    return res; + +  /* ERRORS */ +failed_thread: +  { +    return GST_STATE_CHANGE_FAILURE; +  } +} + +/* called when the session manager has an RTP packet ready for further + * processing */ +static GstFlowReturn +gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, +    GstBuffer * buffer, gpointer user_data) +{ +  GstFlowReturn result; +  GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv; + +  rtpsession = GST_RTP_SESSION (user_data); +  priv = rtpsession->priv; + +  if (rtpsession->recv_rtp_src) { +    result = gst_pad_push (rtpsession->recv_rtp_src, buffer); +  } else { +    gst_buffer_unref (buffer); +    result = GST_FLOW_OK; +  } +  return result; +} + +/* called when the session manager has an RTP packet ready for further + * sending */ +static GstFlowReturn +gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, +    GstBuffer * buffer, gpointer user_data) +{ +  GstFlowReturn result; +  GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv; + +  rtpsession = GST_RTP_SESSION (user_data); +  priv = rtpsession->priv; + +  if (rtpsession->send_rtp_src) { +    result = gst_pad_push (rtpsession->send_rtp_src, buffer); +  } else { +    gst_buffer_unref (buffer); +    result = GST_FLOW_OK; +  } +  return result; +} + +/* called when the session manager has an RTCP packet ready for further + * sending */ +static GstFlowReturn +gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, +    GstBuffer * buffer, gpointer user_data) +{ +  GstFlowReturn result; +  GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv; + +  rtpsession = GST_RTP_SESSION (user_data); +  priv = rtpsession->priv; + +  if (rtpsession->send_rtcp_src) { +    result = gst_pad_push (rtpsession->send_rtcp_src, buffer); +  } else { +    gst_buffer_unref (buffer); +    result = GST_FLOW_OK; +  } +  return result; +} + + +/* called when the session manager needs the clock rate */ +static gint +gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, +    gpointer user_data) +{ +  gint result = -1; +  GstRTPSession *rtpsession; +  GValue ret = { 0 }; +  GValue args[2] = { {0}, {0} }; +  GstCaps *caps; +  const GstStructure *caps_struct; + +  rtpsession = GST_RTP_SESSION_CAST (user_data); + +  g_value_init (&args[0], GST_TYPE_ELEMENT); +  g_value_set_object (&args[0], rtpsession); +  g_value_init (&args[1], G_TYPE_UINT); +  g_value_set_uint (&args[1], payload); + +  g_value_init (&ret, GST_TYPE_CAPS); +  g_value_set_boxed (&ret, NULL); + +  g_signal_emitv (args, gst_rtp_session_signals[SIGNAL_REQUEST_PT_MAP], 0, +      &ret); + +  caps = (GstCaps *) g_value_get_boxed (&ret); +  if (!caps) +    goto no_caps; + +  caps_struct = gst_caps_get_structure (caps, 0); +  if (!gst_structure_get_int (caps_struct, "clock-rate", &result)) +    goto no_clock_rate; + +  return result; + +  /* ERRORS */ +no_caps: +  { +    GST_DEBUG_OBJECT (rtpsession, "could not get caps"); +    return -1; +  } +no_clock_rate: +  { +    GST_DEBUG_OBJECT (rtpsession, "could not clock-rate from caps"); +    return -1; +  } +} + +/* called when the session manager needs the time of clock */ +static GstClockTime +gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) +{ +  GstClockTime result; +  GstRTPSession *rtpsession; +  GstClock *clock; + +  rtpsession = GST_RTP_SESSION_CAST (user_data); + +  clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); +  if (clock) { +    result = gst_clock_get_time (clock); +    gst_object_unref (clock); +  } else +    result = GST_CLOCK_TIME_NONE; + +  return result;  }  static GstFlowReturn  gst_rtp_session_event_recv_rtp_sink (GstPad * pad, GstEvent * event)  {    GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv;    gboolean ret = FALSE;    rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); +  priv = rtpsession->priv;    GST_DEBUG_OBJECT (rtpsession, "received event %s",        GST_EVENT_TYPE_NAME (event)); @@ -305,14 +580,15 @@ static GstFlowReturn  gst_rtp_session_chain_recv_rtp (GstPad * pad, GstBuffer * buffer)  {    GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv;    GstFlowReturn ret;    rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); +  priv = rtpsession->priv;    GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); -  /* FIXME, do something */ -  ret = gst_pad_push (rtpsession->recv_rtp_src, buffer); +  ret = rtp_session_process_rtp (priv->session, buffer);    gst_object_unref (rtpsession); @@ -323,9 +599,11 @@ static GstFlowReturn  gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstEvent * event)  {    GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv;    gboolean ret = FALSE;    rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); +  priv = rtpsession->priv;    GST_DEBUG_OBJECT (rtpsession, "received event %s",        GST_EVENT_TYPE_NAME (event)); @@ -347,14 +625,15 @@ static GstFlowReturn  gst_rtp_session_chain_recv_rtcp (GstPad * pad, GstBuffer * buffer)  {    GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv;    GstFlowReturn ret;    rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); +  priv = rtpsession->priv; -  /* FIXME, do something */    GST_DEBUG_OBJECT (rtpsession, "received RTCP packet"); -  ret = gst_pad_push (rtpsession->sync_src, buffer); +  ret = rtp_session_process_rtcp (priv->session, buffer);    gst_object_unref (rtpsession); @@ -365,9 +644,11 @@ static GstFlowReturn  gst_rtp_session_event_send_rtp_sink (GstPad * pad, GstEvent * event)  {    GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv;    gboolean ret = FALSE;    rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); +  priv = rtpsession->priv;    GST_DEBUG_OBJECT (rtpsession, "received event"); @@ -388,14 +669,15 @@ static GstFlowReturn  gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer)  {    GstRTPSession *rtpsession; +  GstRTPSessionPrivate *priv;    GstFlowReturn ret;    rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); +  priv = rtpsession->priv;    GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); -  /* FIXME, do something */ -  ret = gst_pad_push (rtpsession->send_rtp_src, buffer); +  ret = rtp_session_send_rtp (priv->session, buffer);    gst_object_unref (rtpsession); @@ -494,16 +776,18 @@ create_send_rtp_sink (GstRTPSession * rtpsession)   * RTCP packets.   */  static GstPad * -create_rtcp_src (GstRTPSession * rtpsession) +create_send_rtcp_src (GstRTPSession * rtpsession)  {    GST_DEBUG_OBJECT (rtpsession, "creating pad"); -  rtpsession->rtcp_src = -      gst_pad_new_from_static_template (&rtpsession_rtcp_src_template, NULL); -  gst_pad_set_active (rtpsession->rtcp_src, TRUE); -  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->rtcp_src); +  rtpsession->send_rtcp_src = +      gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, +      NULL); +  gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); +  gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), +      rtpsession->send_rtcp_src); -  return rtpsession->rtcp_src; +  return rtpsession->send_rtcp_src;  }  static GstPad * @@ -542,11 +826,12 @@ gst_rtp_session_request_new_pad (GstElement * element,        goto exists;      result = create_send_rtp_sink (rtpsession); -  } else if (templ == gst_element_class_get_pad_template (klass, "rtcp_src")) { -    if (rtpsession->rtcp_src != NULL) +  } else if (templ == gst_element_class_get_pad_template (klass, +          "send_rtcp_src")) { +    if (rtpsession->send_rtcp_src != NULL)        goto exists; -    result = create_rtcp_src (rtpsession); +    result = create_send_rtcp_src (rtpsession);    } else      goto wrong_template; diff --git a/gst/rtpmanager/gstrtpsession.h b/gst/rtpmanager/gstrtpsession.h index 8b343064..25bbb6eb 100644 --- a/gst/rtpmanager/gstrtpsession.h +++ b/gst/rtpmanager/gstrtpsession.h @@ -32,6 +32,7 @@    (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTP_SESSION))  #define GST_IS_RTP_SESSION_CLASS(klass) \    (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTP_SESSION)) +#define GST_RTP_SESSION_CAST(obj) ((GstRTPSession *)(obj))  typedef struct _GstRTPSession GstRTPSession;  typedef struct _GstRTPSessionClass GstRTPSessionClass; @@ -48,13 +49,16 @@ struct _GstRTPSession {    GstPad        *recv_rtp_src;    GstPad        *sync_src;    GstPad        *send_rtp_src; -  GstPad        *rtcp_src; +  GstPad        *send_rtcp_src;    GstRTPSessionPrivate *priv;  };  struct _GstRTPSessionClass {    GstElementClass parent_class; + +  /* signals */ +  GstCaps* (*request_pt_map) (GstRTPSession *sess, guint pt);  };  GType gst_rtp_session_get_type (void); diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c new file mode 100644 index 00000000..2283dc97 --- /dev/null +++ b/gst/rtpmanager/rtpsession.c @@ -0,0 +1,1026 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include <string.h> + +#include <gst/rtp/gstrtpbuffer.h> +#include <gst/rtp/gstrtcpbuffer.h> +#include <gst/netbuffer/gstnetbuffer.h> + +#include "rtpsession.h" + +GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); +#define GST_CAT_DEFAULT rtp_session_debug + +/* signals and args */ +enum +{ +  SIGNAL_ON_NEW_SSRC, +  SIGNAL_ON_SSRC_COLLISION, +  SIGNAL_ON_SSRC_VALIDATED, +  SIGNAL_ON_BYE_SSRC, +  LAST_SIGNAL +}; + +#define RTP_DEFAULT_BANDWIDTH        64000.0 +#define RTP_DEFAULT_RTCP_BANDWIDTH   1000 + +enum +{ +  PROP_0 +}; + +/* GObject vmethods */ +static void rtp_session_finalize (GObject * object); +static void rtp_session_set_property (GObject * object, guint prop_id, +    const GValue * value, GParamSpec * pspec); +static void rtp_session_get_property (GObject * object, guint prop_id, +    GValue * value, GParamSpec * pspec); + +static guint rtp_session_signals[LAST_SIGNAL] = { 0 }; + +G_DEFINE_TYPE (RTPSession, rtp_session, G_TYPE_OBJECT); + +static void +rtp_session_class_init (RTPSessionClass * klass) +{ +  GObjectClass *gobject_class; + +  gobject_class = (GObjectClass *) klass; + +  gobject_class->finalize = rtp_session_finalize; +  gobject_class->set_property = rtp_session_set_property; +  gobject_class->get_property = rtp_session_get_property; + +  /** +   * RTPSession::on-new-ssrc: +   * @session: the object which received the signal +   * @src: the new RTPSource +   * +   * Notify of a new SSRC that entered @session. +   */ +  rtp_session_signals[SIGNAL_ON_NEW_SSRC] = +      g_signal_new ("on-new-ssrc", G_TYPE_FROM_CLASS (klass), +      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_new_ssrc), +      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, +      G_TYPE_OBJECT); +  /** +   * RTPSession::on-ssrc_collision: +   * @session: the object which received the signal +   * @src: the #RTPSource that caused a collision +   * +   * Notify when we have an SSRC collision +   */ +  rtp_session_signals[SIGNAL_ON_SSRC_COLLISION] = +      g_signal_new ("on-ssrc-collision", G_TYPE_FROM_CLASS (klass), +      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_collision), +      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, +      G_TYPE_OBJECT); +  /** +   * RTPSession::on-ssrc_validated: +   * @session: the object which received the signal +   * @src: the new validated RTPSource +   * +   * Notify of a new SSRC that became validated. +   */ +  rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED] = +      g_signal_new ("on-ssrc-validated", G_TYPE_FROM_CLASS (klass), +      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_ssrc_validated), +      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, +      G_TYPE_OBJECT); +  /** +   * RTPSession::on-bye-ssrc: +   * @session: the object which received the signal +   * @src: the RTPSource that went away +   * +   * Notify of an SSRC that became inactive because of a BYE packet. +   */ +  rtp_session_signals[SIGNAL_ON_BYE_SSRC] = +      g_signal_new ("on-bye-ssrc", G_TYPE_FROM_CLASS (klass), +      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (RTPSessionClass, on_bye_ssrc), +      NULL, NULL, g_cclosure_marshal_VOID__OBJECT, G_TYPE_NONE, 1, +      G_TYPE_OBJECT); + +  GST_DEBUG_CATEGORY_INIT (rtp_session_debug, "rtpsession", 0, "RTP Session"); +} + +static void +rtp_session_init (RTPSession * sess) +{ +  sess->lock = g_mutex_new (); +  sess->ssrcs = +      g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) g_object_unref); +  sess->cnames = g_hash_table_new_full (NULL, NULL, g_free, NULL); + +  /* create an SSRC for this session manager */ +  sess->source = rtp_session_create_source (sess); + +  rtp_stats_init_defaults (&sess->stats); + +  /* default UDP header length */ +  sess->header_len = 28; + +  GST_DEBUG ("%p: session using SSRC: %08x", sess, sess->source->ssrc); +} + +static void +rtp_session_finalize (GObject * object) +{ +  RTPSession *sess; + +  sess = RTP_SESSION_CAST (object); + +  g_mutex_free (sess->lock); +  g_hash_table_unref (sess->ssrcs); +  g_hash_table_unref (sess->cnames); +  g_object_unref (sess->source); + +  G_OBJECT_CLASS (rtp_session_parent_class)->finalize (object); +} + +static void +rtp_session_set_property (GObject * object, guint prop_id, +    const GValue * value, GParamSpec * pspec) +{ +  RTPSession *sess; + +  sess = RTP_SESSION (object); + +  switch (prop_id) { +    default: +      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); +      break; +  } +} + +static void +rtp_session_get_property (GObject * object, guint prop_id, +    GValue * value, GParamSpec * pspec) +{ +  RTPSession *sess; + +  sess = RTP_SESSION (object); + +  switch (prop_id) { +    default: +      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); +      break; +  } +} + +static void +on_new_ssrc (RTPSession * sess, RTPSource * source) +{ +  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_NEW_SSRC], 0, source); +} + +static void +on_ssrc_collision (RTPSession * sess, RTPSource * source) +{ +  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_COLLISION], 0, +      source); +} + +static void +on_ssrc_validated (RTPSession * sess, RTPSource * source) +{ +  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_SSRC_VALIDATED], 0, +      source); +} + +static void +on_bye_ssrc (RTPSession * sess, RTPSource * source) +{ +  g_signal_emit (sess, rtp_session_signals[SIGNAL_ON_BYE_SSRC], 0, source); +} + +/** + * rtp_session_new: + * + * Create a new session object. + * + * Returns: a new #RTPSession. g_object_unref() after usage. + */ +RTPSession * +rtp_session_new (void) +{ +  RTPSession *sess; + +  sess = g_object_new (RTP_TYPE_SESSION, NULL); + +  return sess; +} + +/** + * rtp_session_set_callbacks: + * @sess: an #RTPSession + * @callbacks: callbacks to configure + * @user_data: user data passed in the callbacks + * + * Configure a set of callbacks to be notified of actions. + */ +void +rtp_session_set_callbacks (RTPSession * sess, RTPSessionCallbacks * callbacks, +    gpointer user_data) +{ +  g_return_if_fail (RTP_IS_SESSION (sess)); + +  sess->callbacks.process_rtp = callbacks->process_rtp; +  sess->callbacks.send_rtp = callbacks->send_rtp; +  sess->callbacks.send_rtcp = callbacks->send_rtcp; +  sess->callbacks.clock_rate = callbacks->clock_rate; +  sess->callbacks.get_time = callbacks->get_time; +  sess->user_data = user_data; +} + +/** + * rtp_session_set_bandwidth: + * @sess: an #RTPSession + * @bandwidth: the bandwidth allocated + * + * Set the session bandwidth in bytes per second. + */ +void +rtp_session_set_bandwidth (RTPSession * sess, gdouble bandwidth) +{ +  g_return_if_fail (RTP_IS_SESSION (sess)); + +  sess->stats.bandwidth = bandwidth; +} + +/** + * rtp_session_get_bandwidth: + * @sess: an #RTPSession + * + * Get the session bandwidth. + * + * Returns: the session bandwidth. + */ +gdouble +rtp_session_get_bandwidth (RTPSession * sess) +{ +  g_return_val_if_fail (RTP_IS_SESSION (sess), 0); + +  return sess->stats.bandwidth; +} + +/** + * rtp_session_set_rtcp_bandwidth: + * @sess: an #RTPSession + * @bandwidth: the RTCP bandwidth + * + * Set the bandwidth that should be used for RTCP + * messages.  + */ +void +rtp_session_set_rtcp_bandwidth (RTPSession * sess, gdouble bandwidth) +{ +  g_return_if_fail (RTP_IS_SESSION (sess)); + +  sess->stats.rtcp_bandwidth = bandwidth; +} + +/** + * rtp_session_get_rtcp_bandwidth: + * @sess: an #RTPSession + * + * Get the session bandwidth used for RTCP. + * + * Returns: The bandwidth used for RTCP messages. + */ +gdouble +rtp_session_get_rtcp_bandwidth (RTPSession * sess) +{ +  g_return_val_if_fail (RTP_IS_SESSION (sess), 0.0); + +  return sess->stats.rtcp_bandwidth; +} + +static GstFlowReturn +source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) +{ +  GstFlowReturn result = GST_FLOW_OK; + +  if (source == session->source) { +    GST_DEBUG ("source %08x pushed sender RTP packet", source->ssrc); +    if (session->callbacks.send_rtp) +      result = +          session->callbacks.send_rtp (session, source, buffer, +          session->user_data); +    else +      gst_buffer_unref (buffer); +  } else { +    GST_DEBUG ("source %08x pushed receiver RTP packet", source->ssrc); +    if (session->callbacks.process_rtp) +      result = +          session->callbacks.process_rtp (session, source, buffer, +          session->user_data); +    else +      gst_buffer_unref (buffer); +  } +  return result; +} + +static gint +source_clock_rate (RTPSource * source, guint8 pt, RTPSession * session) +{ +  gint result; + +  if (session->callbacks.clock_rate) +    result = session->callbacks.clock_rate (session, pt, session->user_data); +  else +    result = -1; + +  GST_DEBUG ("got clock-rate %d for pt %d", result, pt); + +  return result; +} + +static RTPSourceCallbacks callbacks = { +  (RTPSourcePushRTP) source_push_rtp, +  (RTPSourceClockRate) source_clock_rate, +}; + +static gboolean +check_collision (RTPSession * sess, RTPSource * source, +    RTPArrivalStats * arrival) +{ +  /* FIXME, do collision check */ +  return FALSE; +} + +static RTPSource * +obtain_source (RTPSession * sess, guint32 ssrc, gboolean * created, +    RTPArrivalStats * arrival, gboolean rtp) +{ +  RTPSource *source; + +  source = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); +  if (source == NULL) { +    /* make new Source in probation and insert */ +    source = rtp_source_new (ssrc); + +    if (rtp) +      source->probation = RTP_DEFAULT_PROBATION; +    else +      source->probation = 0; + +    /* store from address, if any */ +    if (arrival->have_address) { +      if (rtp) +        rtp_source_set_rtp_from (source, &arrival->address); +      else +        rtp_source_set_rtcp_from (source, &arrival->address); +    } + +    /* configure a callback on the source */ +    rtp_source_set_callbacks (source, &callbacks, sess); + +    g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); + +    /* we have one more source now */ +    sess->total_sources++; +    *created = TRUE; +  } else { +    *created = FALSE; +    /* check for collision, this updates the address when not previously set */ +    if (check_collision (sess, source, arrival)) +      on_ssrc_collision (sess, source); +  } +  return source; +} + +/** + * rtp_session_add_source: + * @sess: a #RTPSession + * @src: #RTPSource to add + * + * Add @src to @session. + * + * Returns: %TRUE on success, %FALSE if a source with the same SSRC already + * existed in the session. + */ +gboolean +rtp_session_add_source (RTPSession * sess, RTPSource * src) +{ +  gboolean result = FALSE; +  RTPSource *find; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE); +  g_return_val_if_fail (src != NULL, FALSE); + +  RTP_SESSION_LOCK (sess); +  find = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (src->ssrc)); +  if (find == NULL) { +    g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (src->ssrc), src); +    /* we have one more source now */ +    sess->total_sources++; +    result = TRUE; +  } +  RTP_SESSION_UNLOCK (sess); + +  return result; +} + +/** + * rtp_session_get_num_sources: + * @sess: an #RTPSession + * + * Get the number of sources in @sess. + * + * Returns: The number of sources in @sess. + */ +gint +rtp_session_get_num_sources (RTPSession * sess) +{ +  gint result; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE); + +  RTP_SESSION_LOCK (sess); +  result = sess->total_sources; +  RTP_SESSION_UNLOCK (sess); + +  return result; +} + +/** + * rtp_session_get_num_active_sources: + * @sess: an #RTPSession + * + * Get the number of active sources in @sess. A source is considered active when + * it has been validated and has not yet received a BYE RTCP message. + * + * Returns: The number of active sources in @sess. + */ +gint +rtp_session_get_num_active_sources (RTPSession * sess) +{ +  gint result; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), FALSE); + +  RTP_SESSION_LOCK (sess); +  result = sess->stats.active_sources; +  RTP_SESSION_UNLOCK (sess); + +  return result; +} + +/** + * rtp_session_get_source_by_ssrc: + * @sess: an #RTPSession + * @ssrc: an SSRC + * + * Find the source with @ssrc in @sess. + * + * Returns: a #RTPSource with SSRC @ssrc or NULL if the source was not found. + * g_object_unref() after usage. + */ +RTPSource * +rtp_session_get_source_by_ssrc (RTPSession * sess, guint32 ssrc) +{ +  RTPSource *result; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); + +  RTP_SESSION_LOCK (sess); +  result = g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)); +  if (result) +    g_object_ref (result); +  RTP_SESSION_UNLOCK (sess); + +  return result; +} + +/** + * rtp_session_get_source_by_cname: + * @sess: a #RTPSession + * @cname: an CNAME + * + * Find the source with @cname in @sess. + * + * Returns: a #RTPSource with CNAME @cname or NULL if the source was not found. + * g_object_unref() after usage. + */ +RTPSource * +rtp_session_get_source_by_cname (RTPSession * sess, const gchar * cname) +{ +  RTPSource *result; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), NULL); +  g_return_val_if_fail (cname != NULL, NULL); + +  RTP_SESSION_LOCK (sess); +  result = g_hash_table_lookup (sess->cnames, cname); +  if (result) +    g_object_ref (result); +  RTP_SESSION_UNLOCK (sess); + +  return result; +} + +/** + * rtp_session_create_source: + * @sess: an #RTPSession + * + * Create an #RTPSource for use in @sess. This function will create a source + * with an ssrc that is currently not used by any participants in the session. + * + * Returns: an #RTPSource. + */ +RTPSource * +rtp_session_create_source (RTPSession * sess) +{ +  guint32 ssrc; +  RTPSource *source; + +  RTP_SESSION_LOCK (sess); +  while (TRUE) { +    ssrc = g_random_int (); + +    /* see if it exists in the session, we're done if it doesn't */ +    if (g_hash_table_lookup (sess->ssrcs, GINT_TO_POINTER (ssrc)) == NULL) +      break; +  } +  source = rtp_source_new (ssrc); +  g_hash_table_insert (sess->ssrcs, GINT_TO_POINTER (ssrc), source); +  /* we have one more source now */ +  sess->total_sources++; +  RTP_SESSION_UNLOCK (sess); + +  return source; +} + +/* update the RTPArrivalStats structure with the current time and other bits + * about the current buffer we are handling. + * This function is typically called when a validated packet is received. + */ +static void +update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, +    gboolean rtp, GstBuffer * buffer) +{ +  /* get time or arrival */ +  if (sess->callbacks.get_time) +    arrival->time = sess->callbacks.get_time (sess, sess->user_data); +  else +    arrival->time = GST_CLOCK_TIME_NONE; + +  /* update sizes */ +  arrival->bytes = GST_BUFFER_SIZE (buffer) + 28; +  arrival->payload_len = (rtp ? gst_rtp_buffer_get_payload_len (buffer) : 0); + +  /* for netbuffer we can store the IP address to check for collisions */ +  arrival->have_address = GST_IS_NETBUFFER (buffer); +  if (arrival->have_address) { +    GstNetBuffer *netbuf = (GstNetBuffer *) buffer; + +    memcpy (&arrival->address, &netbuf->from, sizeof (GstNetAddress)); +  } +} + +/** + * rtp_session_process_rtp: + * @sess: and #RTPSession + * @buffer: an RTP buffer + * + * Process an RTP buffer in the session manager. This function takes ownership + * of @buffer. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_process_rtp (RTPSession * sess, GstBuffer * buffer) +{ +  GstFlowReturn result; +  guint32 ssrc; +  RTPSource *source; +  gboolean created; +  gboolean prevsender, prevactive; +  RTPArrivalStats arrival; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); +  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + +  if (!gst_rtp_buffer_validate (buffer)) +    goto invalid_packet; + +  /* update arrival stats */ +  update_arrival_stats (sess, &arrival, TRUE, buffer); + +  /* get SSRC and look up in session database */ +  ssrc = gst_rtp_buffer_get_ssrc (buffer); + +  RTP_SESSION_LOCK (sess); +  source = obtain_source (sess, ssrc, &created, &arrival, TRUE); + +  prevsender = RTP_SOURCE_IS_SENDER (source); +  prevactive = RTP_SOURCE_IS_ACTIVE (source); + +  /* let source process the packet */ +  result = rtp_source_process_rtp (source, buffer, &arrival); + +  /* source became active */ +  if (prevactive != RTP_SOURCE_IS_ACTIVE (source)) { +    sess->stats.active_sources++; +    GST_DEBUG ("source: %08x became active, %d active sources", ssrc, +        sess->stats.active_sources); +    on_ssrc_validated (sess, source); +  } +  if (prevsender != RTP_SOURCE_IS_SENDER (source)) { +    sess->stats.sender_sources++; +    GST_DEBUG ("source: %08x became sender, %d sender sources", ssrc, +        sess->stats.sender_sources); +  } + +  if (created) +    on_new_ssrc (sess, source); + +  /* for validated sources, we add the CSRCs as well */ +  if (source->validated) { +    guint8 i, count; + +    count = gst_rtp_buffer_get_csrc_count (buffer); + +    for (i = 0; i < count; i++) { +      guint32 csrc; +      RTPSource *csrc_src; + +      csrc = gst_rtp_buffer_get_csrc (buffer, i); + +      /* get source */ +      csrc_src = obtain_source (sess, csrc, &created, &arrival, TRUE); +      if (created) { +        GST_DEBUG ("created new CSRC: %08x", csrc); +        rtp_source_set_as_csrc (csrc_src); +        if (RTP_SOURCE_IS_ACTIVE (csrc_src)) +          sess->stats.active_sources++; +        on_new_ssrc (sess, source); +      } +    } +  } +  RTP_SESSION_UNLOCK (sess); + +  return result; + +  /* ERRORS */ +invalid_packet: +  { +    GST_DEBUG ("invalid RTP packet received"); +    return GST_FLOW_OK; +  } +} + +/* A Sender report contains statistics about how the sender is doing. This + * includes timing informataion about the relation between RTP and NTP + * timestamps is it using and the number of packets/bytes it sent to us. + * + * In this report is also included a set of report blocks related to how this + * sender is receiving data (in case we (or somebody else) is also sending stuff + * to it). This info includes the packet loss, jitter and seqnum. It also + * contains information to calculate the round trip time (LSR/DLSR). + */ +static void +rtp_session_process_sr (RTPSession * sess, GstRTCPPacket * packet, +    RTPArrivalStats * arrival) +{ +  guint32 senderssrc, rtptime, packet_count, octet_count; +  guint64 ntptime; +  guint count, i; +  RTPSource *source; +  gboolean created; + +  gst_rtcp_packet_sr_get_sender_info (packet, &senderssrc, &ntptime, &rtptime, +      &packet_count, &octet_count); + +  RTP_SESSION_LOCK (sess); +  source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + +  /* first update the source */ +  rtp_source_process_sr (source, ntptime, rtptime, packet_count, octet_count); + +  if (created) +    on_new_ssrc (sess, source); + +  count = gst_rtcp_packet_get_rb_count (packet); +  for (i = 0; i < count; i++) { +    guint32 ssrc, exthighestseq, jitter, lsr, dlsr; +    guint8 fractionlost; +    gint32 packetslost; + +    gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, +        &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + +    if (ssrc == sess->source->ssrc) { +      /* only deal with report blocks for our session, we update the stats of +       * the sender of the TCP message. We could also compare our stats against +       * the other sender to see if we are better or worse. */ +      rtp_source_process_rb (source, fractionlost, packetslost, +          exthighestseq, jitter, lsr, dlsr); +    } +  } +  RTP_SESSION_UNLOCK (sess); +} + +/* A receiver report contains statistics about how a receiver is doing. It + * includes stuff like packet loss, jitter and the seqnum it received last. It + * also contains info to calculate the round trip time. + * + * We are only interested in how the sender of this report is doing wrt to us. + */ +static void +rtp_session_process_rr (RTPSession * sess, GstRTCPPacket * packet, +    RTPArrivalStats * arrival) +{ +  guint32 senderssrc; +  guint count, i; +  RTPSource *source; +  gboolean created; + +  senderssrc = gst_rtcp_packet_rr_get_ssrc (packet); + +  GST_DEBUG ("got RR packet: SSRC %08x", senderssrc); + +  RTP_SESSION_LOCK (sess); +  source = obtain_source (sess, senderssrc, &created, arrival, FALSE); + +  if (created) +    on_new_ssrc (sess, source); + +  count = gst_rtcp_packet_get_rb_count (packet); +  for (i = 0; i < count; i++) { +    guint32 ssrc, exthighestseq, jitter, lsr, dlsr; +    guint8 fractionlost; +    gint32 packetslost; + +    gst_rtcp_packet_get_rb (packet, i, &ssrc, &fractionlost, +        &packetslost, &exthighestseq, &jitter, &lsr, &dlsr); + +    if (ssrc == sess->source->ssrc) { +      rtp_source_process_rb (source, fractionlost, packetslost, +          exthighestseq, jitter, lsr, dlsr); +    } +  } +  RTP_SESSION_UNLOCK (sess); +} + +/* FIXME, we're just printing this for now... */ +static void +rtp_session_process_sdes (RTPSession * sess, GstRTCPPacket * packet, +    RTPArrivalStats * arrival) +{ +  guint chunks, i, j; +  gboolean more_chunks, more_items; + +  chunks = gst_rtcp_packet_sdes_get_chunk_count (packet); +  GST_DEBUG ("got SDES packet with %d chunks", chunks); + +  more_chunks = gst_rtcp_packet_sdes_first_chunk (packet); +  i = 0; +  while (more_chunks) { +    guint32 ssrc; + +    ssrc = gst_rtcp_packet_sdes_get_ssrc (packet); + +    GST_DEBUG ("chunk %d, SSRC %08x", i, ssrc); + +    more_items = gst_rtcp_packet_sdes_first_item (packet); +    j = 0; +    while (more_items) { +      GstRTCPSDESType type; +      guint8 len; +      gchar *data; + +      gst_rtcp_packet_sdes_get_item (packet, &type, &len, &data); + +      GST_DEBUG ("item %d, type %d, len %d, data %s", j, type, len, data); + +      more_items = gst_rtcp_packet_sdes_next_item (packet); +      j++; +    } +    more_chunks = gst_rtcp_packet_sdes_next_chunk (packet); +    i++; +  } +} + +/* BYE is sent when a client leaves the session + */ +static void +rtp_session_process_bye (RTPSession * sess, GstRTCPPacket * packet, +    RTPArrivalStats * arrival) +{ +  guint count, i; +  gchar *reason; + +  reason = gst_rtcp_packet_bye_get_reason (packet); +  GST_DEBUG ("got BYE packet (reason: %s)", GST_STR_NULL (reason)); + +  count = gst_rtcp_packet_bye_get_ssrc_count (packet); +  for (i = 0; i < count; i++) { +    guint32 ssrc; +    RTPSource *source; +    gboolean created, prevactive, prevsender; + +    ssrc = gst_rtcp_packet_bye_get_nth_ssrc (packet, i); +    GST_DEBUG ("SSRC: %08x", ssrc); + +    /* find src and mark bye, no probation when dealing with RTCP */ +    RTP_SESSION_LOCK (sess); +    source = obtain_source (sess, ssrc, &created, arrival, FALSE); + +    prevactive = RTP_SOURCE_IS_ACTIVE (source); +    prevsender = RTP_SOURCE_IS_SENDER (source); + +    /* let the source handle the rest */ +    rtp_source_process_bye (source, reason); + +    if (prevactive && !RTP_SOURCE_IS_ACTIVE (source)) { +      sess->stats.active_sources--; +      GST_DEBUG ("source: %08x became inactive, %d active sources", ssrc, +          sess->stats.active_sources); +    } +    if (prevsender && !RTP_SOURCE_IS_SENDER (source)) { +      sess->stats.sender_sources--; +      GST_DEBUG ("source: %08x became non sender, %d sender sources", ssrc, +          sess->stats.sender_sources); +    } + +    if (created) +      on_new_ssrc (sess, source); + +    on_bye_ssrc (sess, source); +    RTP_SESSION_UNLOCK (sess); +  } +  g_free (reason); +} + +static void +rtp_session_process_app (RTPSession * sess, GstRTCPPacket * packet, +    RTPArrivalStats * arrival) +{ +  GST_DEBUG ("received APP"); +} + +/** + * rtp_session_process_rtcp: + * @sess: and #RTPSession + * @buffer: an RTCP buffer + * + * Process an RTCP buffer in the session manager. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_process_rtcp (RTPSession * sess, GstBuffer * buffer) +{ +  GstRTCPPacket packet; +  gboolean more; +  RTPArrivalStats arrival; +  guint size; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); +  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + +  if (!gst_rtcp_buffer_validate (buffer)) +    goto invalid_packet; + +  /* update arrival stats */ +  update_arrival_stats (sess, &arrival, FALSE, buffer); + +  GST_DEBUG ("received RTCP packet"); + +  /* get packet size including header overhead */ +  size = GST_BUFFER_SIZE (buffer) + sess->header_len; + +  /* update average RTCP packet size */ +  if (sess->stats.avg_rtcp_packet_size == 0) +    sess->stats.avg_rtcp_packet_size = size; +  else +    sess->stats.avg_rtcp_packet_size = +        (size + (15 * sess->stats.avg_rtcp_packet_size)) >> 4; + +  /* start processing the compound packet */ +  more = gst_rtcp_buffer_get_first_packet (buffer, &packet); +  while (more) { +    switch (gst_rtcp_packet_get_type (&packet)) { +      case GST_RTCP_TYPE_SR: +        rtp_session_process_sr (sess, &packet, &arrival); +        break; +      case GST_RTCP_TYPE_RR: +        rtp_session_process_rr (sess, &packet, &arrival); +        break; +      case GST_RTCP_TYPE_SDES: +        rtp_session_process_sdes (sess, &packet, &arrival); +        break; +      case GST_RTCP_TYPE_BYE: +        rtp_session_process_bye (sess, &packet, &arrival); +        break; +      case GST_RTCP_TYPE_APP: +        rtp_session_process_app (sess, &packet, &arrival); +        break; +      default: +        GST_WARNING ("got unknown RTCP packet"); +        break; +    } +    more = gst_rtcp_packet_move_to_next (&packet); +  } + +  gst_buffer_unref (buffer); + +  return GST_FLOW_OK; + +  /* ERRORS */ +invalid_packet: +  { +    GST_DEBUG ("invalid RTCP packet received"); +    return GST_FLOW_OK; +  } +} + +/** + * rtp_session_send_rtp: + * @sess: and #RTPSession + * @buffer: an RTP buffer + * + * Send the RTP buffer in the session manager. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer) +{ +  GstFlowReturn result; +  RTPSource *source; +  gboolean prevsender; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); +  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + +  source = sess->source; + +  prevsender = RTP_SOURCE_IS_SENDER (source); + +  /* we use our own source to send */ +  result = rtp_source_send_rtp (sess->source, buffer); + +  if (RTP_SOURCE_IS_SENDER (source) && !prevsender) +    sess->stats.sender_sources++; + +  return result; +} + +/** + * rtp_session_get_rtcp_interval: + * @sess: an #RTPSession + * + * Get the interval for sending out the next RTCP packet + * + * Returns: an interval in seconds. + */ +gdouble +rtp_session_get_rtcp_interval (RTPSession * sess) +{ +  gdouble result; + +  g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); + +  RTP_SESSION_LOCK (sess); +  result = rtp_stats_calculate_rtcp_interval (&sess->stats, FALSE); +  result = rtp_stats_add_rtcp_jitter (&sess->stats, result); +  RTP_SESSION_UNLOCK (sess); + +  return result; +} + +/** + * rtp_session_produce_rtcp: + * @sess: an #RTPSession + * + * Instruct the session manager to generate RTCP packets with current stats. + * This function will call the #RTPSessionSendRTCP callback, possibly multiple + * times, for each packet that should be processed. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_session_produce_rtcp (RTPSession * sess) +{ +  /* FIXME: implement me */ +  return GST_FLOW_NOT_SUPPORTED; +} diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h new file mode 100644 index 00000000..46062c99 --- /dev/null +++ b/gst/rtpmanager/rtpsession.h @@ -0,0 +1,206 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __RTP_SESSION_H__ +#define __RTP_SESSION_H__ + +#include <gst/gst.h> +#include <gst/netbuffer/gstnetbuffer.h> + +#include "rtpsource.h" + +typedef struct _RTPSession RTPSession; +typedef struct _RTPSessionClass RTPSessionClass; + +#define RTP_TYPE_SESSION             (rtp_session_get_type()) +#define RTP_SESSION(sess)            (G_TYPE_CHECK_INSTANCE_CAST((sess),RTP_TYPE_SESSION,RTPSession)) +#define RTP_SESSION_CLASS(klass)     (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SESSION,RTPSessionClass)) +#define RTP_IS_SESSION(sess)         (G_TYPE_CHECK_INSTANCE_TYPE((sess),RTP_TYPE_SESSION)) +#define RTP_IS_SESSION_CLASS(klass)  (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SESSION)) +#define RTP_SESSION_CAST(sess)       ((RTPSession *)(sess)) + +#define RTP_SESSION_LOCK(sess)     (g_mutex_lock ((sess)->lock)) +#define RTP_SESSION_UNLOCK(sess)   (g_mutex_unlock ((sess)->lock)) + +/** + * RTPSessionProcessRTP: + * @sess: an #RTPSession + * @src: the #RTPSource + * @buffer: the RTP buffer ready for processing + * @user_data: user data specified when registering + * + * This callback will be called when @sess has @buffer ready for further + * processing. Processing the buffer typically includes decoding and displaying + * the buffer. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSessionProcessRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSessionSendRTP: + * @sess: an #RTPSession + * @src: the #RTPSource + * @buffer: the RTP buffer ready for sending + * @user_data: user data specified when registering + * + * This callback will be called when @sess has @buffer ready for sending to + * all listening participants in this session. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSessionSendRTCP: + * @sess: an #RTPSession + * @src: the #RTPSource + * @buffer: the RTCP buffer ready for sending + * @user_data: user data specified when registering + * + * This callback will be called when @sess has @buffer ready for sending to + * all listening participants in this session. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSessionSendRTCP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSessionClockRate: + * @sess: an #RTPSession + * @payload: the payload + * @user_data: user data specified when registering + * + * This callback will be called when @sess needs the clock-rate of @payload. + * + * Returns: the clock-rate of @pt. + */ +typedef gint (*RTPSessionClockRate) (RTPSession *sess, guint8 payload, gpointer user_data); + +/** + * RTPSessionGetTime: + * @sess: an #RTPSession + * @user_data: user data specified when registering + * + * This callback will be called when @sess needs the current time in + * nanoseconds. + * + * Returns: a #GstClockTime with the current time in nanoseconds. + */ +typedef GstClockTime (*RTPSessionGetTime) (RTPSession *sess, gpointer user_data); + +/** + * RTPSessionCallbacks: + * @RTPSessionProcessRTP: callback to process RTP packets + * @RTPSessionSendRTP: callback for sending RTP packets + * @RTPSessionSendRTCP: callback for sending RTCP packets + * @RTPSessionGetTime: callback for returning the current time + * + * These callbacks can be installed on the session manager to get notification + * when RTP and RTCP packets are ready for further processing. These callbacks + * are not implemented with signals for performance reasons. + */ +typedef struct { +  RTPSessionProcessRTP  process_rtp; +  RTPSessionSendRTP     send_rtp; +  RTPSessionSendRTCP    send_rtcp; +  RTPSessionClockRate   clock_rate; +  RTPSessionGetTime     get_time; +} RTPSessionCallbacks; + +/** + * RTPSession: + * @lock: lock to protect the session + * @source: the source of this session + * @ssrcs: Hashtable of sources indexed by SSRC + * @cnames: Hashtable of sources indexed by CNAME + * @num_sources: the number of sources + * @activecount: the number of active sources + * @callbacks: callbacks + * @user_data: user data passed in callbacks + * + * The RTP session manager object + */ +struct _RTPSession { +  GObject       object; + +  GMutex       *lock; + +  guint         header_len; + +  RTPSource    *source; +  GHashTable   *ssrcs; +  GHashTable   *cnames; +  guint         total_sources; + +  RTPSessionCallbacks callbacks; +  gpointer            user_data; + +  RTPSessionStats stats; +}; + +/** + * RTPSessionClass: + * @on_new_ssrc: emited when a new source is found + * @on_bye_ssrc: emited when a source is gone + * + * The session class. + */ +struct _RTPSessionClass { +  GObjectClass   parent_class; + +  /* signals */ +  void (*on_new_ssrc)       (RTPSession *sess, RTPSource *source); +  void (*on_ssrc_collision) (RTPSession *sess, RTPSource *source); +  void (*on_ssrc_validated) (RTPSession *sess, RTPSource *source); +  void (*on_bye_ssrc)       (RTPSession *sess, RTPSource *source); +}; + +GType rtp_session_get_type (void); + +/* create and configure */ +RTPSession*     rtp_session_new           (void); +void            rtp_session_set_callbacks          (RTPSession *sess,  +		                                    RTPSessionCallbacks *callbacks, +                                                    gpointer user_data); +void            rtp_session_set_bandwidth          (RTPSession *sess, gdouble bandwidth); +gdouble         rtp_session_get_bandwidth          (RTPSession *sess); +void            rtp_session_set_rtcp_fraction      (RTPSession *sess, gdouble fraction); +gdouble         rtp_session_get_rtcp_fraction      (RTPSession *sess); + +/* handling sources */ +gboolean        rtp_session_add_source             (RTPSession *sess, RTPSource *src); +gint            rtp_session_get_num_sources        (RTPSession *sess); +gint            rtp_session_get_num_active_sources (RTPSession *sess); +RTPSource*      rtp_session_get_source_by_ssrc     (RTPSession *sess, guint32 ssrc); +RTPSource*      rtp_session_get_source_by_cname    (RTPSession *sess, const gchar *cname); +RTPSource*      rtp_session_create_source          (RTPSession *sess); + +/* processing packets from receivers */ +GstFlowReturn   rtp_session_process_rtp            (RTPSession *sess, GstBuffer *buffer); +GstFlowReturn   rtp_session_process_rtcp           (RTPSession *sess, GstBuffer *buffer); + +/* processing packets for sending */ +GstFlowReturn   rtp_session_send_rtp               (RTPSession *sess, GstBuffer *buffer); + +/* get interval for next RTCP interval */ +gdouble         rtp_session_get_rtcp_interval      (RTPSession *sess); +GstFlowReturn   rtp_session_produce_rtcp           (RTPSession *sess); + +#endif /* __RTP_SESSION_H__ */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c new file mode 100644 index 00000000..36f54381 --- /dev/null +++ b/gst/rtpmanager/rtpsource.c @@ -0,0 +1,477 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ +#include <string.h> + +#include <gst/rtp/gstrtpbuffer.h> +#include <gst/rtp/gstrtcpbuffer.h> + +#include "rtpsource.h" + +GST_DEBUG_CATEGORY_STATIC (rtp_source_debug); +#define GST_CAT_DEFAULT rtp_source_debug + +#define RTP_MAX_PROBATION_LEN	32 + +/* signals and args */ +enum +{ +  LAST_SIGNAL +}; + +enum +{ +  PROP_0 +}; + +/* GObject vmethods */ +static void rtp_source_finalize (GObject * object); + +/* static guint rtp_source_signals[LAST_SIGNAL] = { 0 }; */ + +G_DEFINE_TYPE (RTPSource, rtp_source, G_TYPE_OBJECT); + +static void +rtp_source_class_init (RTPSourceClass * klass) +{ +  GObjectClass *gobject_class; + +  gobject_class = (GObjectClass *) klass; + +  gobject_class->finalize = rtp_source_finalize; + +  GST_DEBUG_CATEGORY_INIT (rtp_source_debug, "rtpsource", 0, "RTP Source"); +} + +static void +rtp_source_init (RTPSource * src) +{ +  /* sources are initialy on probation until we receive enough valid RTP +   * packets or a valid RTCP packet */ +  src->validated = FALSE; +  src->probation = RTP_DEFAULT_PROBATION; + +  src->payload = 0; +  src->clock_rate = -1; +  src->packets = g_queue_new (); + +  src->stats.jitter = 0; +  src->stats.transit = -1; +  src->stats.curr_sr = 0; +  src->stats.curr_rr = 0; +} + +static void +rtp_source_finalize (GObject * object) +{ +  RTPSource *src; +  GstBuffer *buffer; + +  src = RTP_SOURCE_CAST (object); + +  while ((buffer = g_queue_pop_head (src->packets))) +    gst_buffer_unref (buffer); +  g_queue_free (src->packets); + +  G_OBJECT_CLASS (rtp_source_parent_class)->finalize (object); +} + +/** + * rtp_source_new: + * @ssrc: an SSRC + * + * Create a #RTPSource with @ssrc. + * + * Returns: a new #RTPSource. Use g_object_unref() after usage. + */ +RTPSource * +rtp_source_new (guint32 ssrc) +{ +  RTPSource *src; + +  src = g_object_new (RTP_TYPE_SOURCE, NULL); +  src->ssrc = ssrc; + +  return src; +} + +/** + * rtp_source_set_callbacks: + * @src: an #RTPSource + * @cb: callback functions + * @user_data: user data + * + * Set the callbacks for the source. + */ +void +rtp_source_set_callbacks (RTPSource * src, RTPSourceCallbacks * cb, +    gpointer user_data) +{ +  g_return_if_fail (RTP_IS_SOURCE (src)); + +  src->callbacks.push_rtp = cb->push_rtp; +  src->callbacks.clock_rate = cb->clock_rate; +  src->user_data = user_data; +} + +/** + * rtp_source_set_as_csrc: + * @src: an #RTPSource + * + * Configure @src as a CSRC, this will validate the RTpSource. + */ +void +rtp_source_set_as_csrc (RTPSource * src) +{ +  g_return_if_fail (RTP_IS_SOURCE (src)); + +  src->validated = TRUE; +  src->is_csrc = TRUE; +} + +/** + * rtp_source_set_rtp_from: + * @src: an #RTPSource + * @address: the RTP address to set + * + * Set that @src is receiving RTP packets from @address. This is used for + * collistion checking. + */ +void +rtp_source_set_rtp_from (RTPSource * src, GstNetAddress * address) +{ +  g_return_if_fail (RTP_IS_SOURCE (src)); + +  src->have_rtp_from = TRUE; +  memcpy (&src->rtp_from, address, sizeof (GstNetAddress)); +} + +/** + * rtp_source_set_rtcp_from: + * @src: an #RTPSource + * @address: the RTCP address to set + * + * Set that @src is receiving RTCP packets from @address. This is used for + * collistion checking. + */ +void +rtp_source_set_rtcp_from (RTPSource * src, GstNetAddress * address) +{ +  g_return_if_fail (RTP_IS_SOURCE (src)); + +  src->have_rtcp_from = TRUE; +  memcpy (&src->rtcp_from, address, sizeof (GstNetAddress)); +} + +static GstFlowReturn +push_packet (RTPSource * src, GstBuffer * buffer) +{ +  GstFlowReturn ret = GST_FLOW_OK; + +  /* push queued packets first if any */ +  while (!g_queue_is_empty (src->packets)) { +    GstBuffer *buffer = GST_BUFFER_CAST (g_queue_pop_head (src->packets)); + +    GST_DEBUG ("pushing queued packet"); +    if (src->callbacks.push_rtp) +      src->callbacks.push_rtp (src, buffer, src->user_data); +    else +      gst_buffer_unref (buffer); +  } +  GST_DEBUG ("pushing new packet"); +  /* push packet */ +  if (src->callbacks.push_rtp) +    ret = src->callbacks.push_rtp (src, buffer, src->user_data); +  else +    gst_buffer_unref (buffer); + +  return ret; +} + +static gint +get_clock_rate (RTPSource * src, guint8 payload) +{ +  if (payload != src->payload) { +    gint clock_rate = -1; + +    if (src->callbacks.clock_rate) +      clock_rate = src->callbacks.clock_rate (src, payload, src->user_data); + +    GST_DEBUG ("new payload %d, got clock-rate %d", payload, clock_rate); + +    src->clock_rate = clock_rate; +    src->payload = payload; +  } +  return src->clock_rate; +} + +static void +calculate_jitter (RTPSource * src, GstBuffer * buffer, +    RTPArrivalStats * arrival) +{ +  GstClockTime current; +  guint32 rtparrival, transit, rtptime; +  gint32 diff; +  gint clock_rate; +  guint8 pt; + +  /* get arrival time */ +  if ((current = arrival->time) == GST_CLOCK_TIME_NONE) +    goto no_time; + +  pt = gst_rtp_buffer_get_payload_type (buffer); + +  /* get clockrate */ +  if ((clock_rate = get_clock_rate (src, pt)) == -1) +    goto no_clock_rate; + +  rtptime = gst_rtp_buffer_get_timestamp (buffer); + +  /* convert arrival time to RTP timestamp units */ +  rtparrival = gst_util_uint64_scale_int (current, clock_rate, GST_SECOND); + +  /* transit time is difference with RTP timestamp */ +  transit = rtparrival - rtptime; +  /* get diff with previous transit time */ +  if (src->stats.transit != -1) +    diff = transit - src->stats.transit; +  else +    diff = 0; +  src->stats.transit = transit; +  if (diff < 0) +    diff = -diff; +  /* update jitter */ +  src->stats.jitter += diff - ((src->stats.jitter + 8) >> 4); + +  src->stats.prev_rtptime = src->stats.last_rtptime; +  src->stats.last_rtptime = rtparrival; + +  GST_DEBUG ("rtparrival %u, rtptime %u, clock-rate %d, diff %d, jitter: %u", +      rtparrival, rtptime, clock_rate, diff, src->stats.jitter); + +  return; + +  /* ERRORS */ +no_time: +  { +    GST_WARNING ("cannot get current time"); +    return; +  } +no_clock_rate: +  { +    GST_WARNING ("cannot get clock-rate for pt %d", pt); +    return; +  } +} + +/** + * rtp_source_process_rtp: + * @src: an #RTPSource + * @buffer: an RTP buffer + * + * Let @src handle the incomming RTP @buffer. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_source_process_rtp (RTPSource * src, GstBuffer * buffer, +    RTPArrivalStats * arrival) +{ +  GstFlowReturn result = GST_FLOW_OK; + +  g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); +  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + +  /* if we are still on probation, check seqnum */ +  if (src->probation) { +    guint16 seqnr, expected; + +    expected = src->stats.max_seqnr + 1; + +    /* when in probation, we require consecutive seqnums */ +    seqnr = gst_rtp_buffer_get_seq (buffer); +    if (seqnr == expected) { +      /* expected packet */ +      src->probation--; +      src->stats.max_seqnr = seqnr; + +      GST_DEBUG ("probation: seqnr %d == expected %d", seqnr, expected); +    } else { +      GST_DEBUG ("probation: seqnr %d != expected %d", seqnr, expected); +      src->probation = RTP_DEFAULT_PROBATION; +      src->stats.max_seqnr = seqnr; +    } +  } +  if (src->probation) { +    GstBuffer *q; + +    GST_DEBUG ("probation %d: queue buffer", src->probation); +    /* when still in probation, keep packets in a list. */ +    g_queue_push_tail (src->packets, buffer); +    /* remove packets from queue if there are too many */ +    while (g_queue_get_length (src->packets) > RTP_MAX_PROBATION_LEN) { +      q = g_queue_pop_head (src->packets); +      gst_object_unref (q); +    } +  } else { +    /* we are not in probation */ +    src->stats.octetsreceived += arrival->payload_len; +    src->stats.bytesreceived += arrival->bytes; +    src->stats.packetsreceived++; +    src->is_sender = TRUE; + +    GST_DEBUG ("PC: %" G_GUINT64_FORMAT ", OC: %" G_GUINT64_FORMAT, +        src->stats.packetsreceived, src->stats.octetsreceived); + +    /* calculate jitter */ +    calculate_jitter (src, buffer, arrival); + +    /* we're ready to push the RTP packet now */ +    result = push_packet (src, buffer); +  } +  return result; +} + +/** + * rtp_source_process_bye: + * @src: an #RTPSource + * @reason: the reason for leaving + * + * Notify @src that a BYE packet has been received. This will make the source + * inactive. + */ +void +rtp_source_process_bye (RTPSource * src, const gchar * reason) +{ +  g_return_if_fail (RTP_IS_SOURCE (src)); + +  GST_DEBUG ("marking SSRC %08x as BYE, reason: %s", src->ssrc, +      GST_STR_NULL (reason)); + +  /* copy the reason and mark as received_bye */ +  g_free (src->bye_reason); +  src->bye_reason = g_strdup (reason); +  src->received_bye = TRUE; +} + +/** + * rtp_source_send_rtp: + * @src: an #RTPSource + * @buffer: an RTP buffer + * + * Send an RTP @buffer originating from @src. This will make @src a sender. + * + * Returns: a #GstFlowReturn. + */ +GstFlowReturn +rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer) +{ +  GstFlowReturn result = GST_FLOW_OK; + +  g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); +  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + +  /* we are a sender now */ +  src->is_sender = TRUE; + +  /* push packet */ +  if (src->callbacks.push_rtp) +    result = src->callbacks.push_rtp (src, buffer, src->user_data); +  else +    gst_buffer_unref (buffer); + +  return result; +} + +/** + * rtp_source_process_sr: + * @src: an #RTPSource + * @ntptime: the NTP time + * @rtptime: the RTP time + * @packet_count: the packet count + * @octet_count: the octect count + * + * Update the sender report in @src. + */ +void +rtp_source_process_sr (RTPSource * src, guint64 ntptime, guint32 rtptime, +    guint32 packet_count, guint32 octet_count) +{ +  RTPSenderReport *curr; +  gint curridx; + +  g_return_if_fail (RTP_IS_SOURCE (src)); + +  GST_DEBUG ("got SR packet: SSRC %08x, NTP %" G_GUINT64_FORMAT +      ", RTP %u, PC %u, OC %u", src->ssrc, ntptime, rtptime, packet_count, +      octet_count); + +  curridx = src->stats.curr_sr ^ 1; +  curr = &src->stats.sr[curridx]; + +  /* update current */ +  curr->is_valid = TRUE; +  curr->ntptime = ntptime; +  curr->rtptime = rtptime; +  curr->packet_count = packet_count; +  curr->octet_count = octet_count; + +  /* make current */ +  src->stats.curr_sr = curridx; +} + +/** + * rtp_source_process_rb: + * @src: an #RTPSource + * @fractionlost: fraction lost since last SR/RR + * @packetslost: the cumululative number of packets lost + * @exthighestseq: the extended last sequence number received + * @jitter: the interarrival jitter + * @lsr: the last SR packet from this source + * @dlsr: the delay since last SR packet + * + * Update the report block in @src. + */ +void +rtp_source_process_rb (RTPSource * src, guint8 fractionlost, gint32 packetslost, +    guint32 exthighestseq, guint32 jitter, guint32 lsr, guint32 dlsr) +{ +  RTPReceiverReport *curr; +  gint curridx; + +  g_return_if_fail (RTP_IS_SOURCE (src)); + +  GST_DEBUG ("got RB packet %d: SSRC %08x, FL %u" +      ", PL %u, HS %u, JITTER %u, LSR %u, DLSR %u", src->ssrc, fractionlost, +      packetslost, exthighestseq, jitter, lsr, dlsr); + +  curridx = src->stats.curr_rr ^ 1; +  curr = &src->stats.rr[curridx]; + +  /* update current */ +  curr->is_valid = TRUE; +  curr->fractionlost = fractionlost; +  curr->packetslost = packetslost; +  curr->exthighestseq = exthighestseq; +  curr->jitter = jitter; +  curr->lsr = lsr; +  curr->dlsr = dlsr; + +  /* make current */ +  src->stats.curr_rr = curridx; +} diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h new file mode 100644 index 00000000..d4ae6f55 --- /dev/null +++ b/gst/rtpmanager/rtpsource.h @@ -0,0 +1,162 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __RTP_SOURCE_H__ +#define __RTP_SOURCE_H__ + +#include <gst/gst.h> +#include <gst/rtp/gstrtcpbuffer.h> +#include <gst/netbuffer/gstnetbuffer.h> + +#include "rtpstats.h" + +/* the default number of consecutive RTP packets we need to receive before the + * source is considered valid */ +#define RTP_NO_PROBATION        0 +#define RTP_DEFAULT_PROBATION   2 + +typedef struct _RTPSource RTPSource; +typedef struct _RTPSourceClass RTPSourceClass; + +#define RTP_TYPE_SOURCE             (rtp_source_get_type()) +#define RTP_SOURCE(src)             (G_TYPE_CHECK_INSTANCE_CAST((src),RTP_TYPE_SOURCE,RTPSource)) +#define RTP_SOURCE_CLASS(klass)     (G_TYPE_CHECK_CLASS_CAST((klass),RTP_TYPE_SOURCE,RTPSourceClass)) +#define RTP_IS_SOURCE(src)          (G_TYPE_CHECK_INSTANCE_TYPE((src),RTP_TYPE_SOURCE)) +#define RTP_IS_SOURCE_CLASS(klass)  (G_TYPE_CHECK_CLASS_TYPE((klass),RTP_TYPE_SOURCE)) +#define RTP_SOURCE_CAST(src)        ((RTPSource *)(src)) + +/** + * RTP_SOURCE_IS_ACTIVE: + * @src: an #RTPSource + * + * Check if @src is active. A source is active when it has been validated + * and has not yet received a BYE packet. + */ +#define RTP_SOURCE_IS_ACTIVE(src)  (src->validated && !src->received_bye) + +/** + * RTP_SOURCE_IS_SENDER: + * @src: an #RTPSource + * + * Check if @src is a sender. + */ +#define RTP_SOURCE_IS_SENDER(src)  (src->is_sender) + +/** + * RTPSourcePushRTP: + * @src: an #RTPSource + * @buffer: the RTP buffer ready for processing + * @user_data: user data specified when registering + * + * This callback will be called when @src has @buffer ready for further + * processing. + * + * Returns: a #GstFlowReturn. + */ +typedef GstFlowReturn (*RTPSourcePushRTP) (RTPSource *src, GstBuffer *buffer, gpointer user_data); + +/** + * RTPSourceClockRate: + * @src: an #RTPSource + * @payload: a payload type + * @user_data: user data specified when registering + * + * This callback will be called when @src needs the clock-rate of the + * @payload. + * + * Returns: a clock-rate for @payload. + */ +typedef gint (*RTPSourceClockRate) (RTPSource *src, guint8 payload, gpointer user_data); + +/** + * RTPSourceCallbacks: + * @push_rtp: a packet becomes available for handling + * @clock_rate: a clock-rate is requested + * @get_time: the current clock time is requested + * + * Callbacks performed by #RTPSource when actions need to be performed. + */ +typedef struct { +  RTPSourcePushRTP     push_rtp; +  RTPSourceClockRate   clock_rate; +} RTPSourceCallbacks; + +/** + * RTPSource: + * + * A source in the #RTPSession + */ +struct _RTPSource { +  GObject       object; + +  /*< private >*/ +  RTPSourceCallbacks callbacks; +  gpointer           user_data; + +  guint32       ssrc; +  gchar        *cname; +  gint          probation; +  gboolean      validated; +  gboolean      received_bye; +  gchar        *bye_reason; + +  gboolean      is_csrc; +  gboolean      is_sender; + +  gboolean      have_rtp_from; +  GstNetAddress rtp_from; +  gboolean      have_rtcp_from; +  GstNetAddress rtcp_from; + +  guint8        payload; +  gint          clock_rate; + +  GQueue       *packets; + +  RTPSourceStats stats; +}; + +struct _RTPSourceClass { +  GObjectClass   parent_class; +}; + +GType rtp_source_get_type (void); + +/* managing lifetime of sources */ +RTPSource*      rtp_source_new            (guint32 ssrc); + +void            rtp_source_set_callbacks  (RTPSource *src, RTPSourceCallbacks *cb, gpointer data); +void            rtp_source_set_as_csrc    (RTPSource *src); + +void            rtp_source_set_rtp_from   (RTPSource *src, GstNetAddress *address); +void            rtp_source_set_rtcp_from  (RTPSource *src, GstNetAddress *address); + +GstFlowReturn   rtp_source_process_rtp    (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival); + +GstFlowReturn   rtp_source_send_rtp       (RTPSource *src, GstBuffer *buffer); + +/* RTCP messages */ +void            rtp_source_process_bye    (RTPSource *src, const gchar *reason); +void            rtp_source_process_sr     (RTPSource *src, guint64 ntptime, guint32 rtptime, +                                           guint32 packet_count, guint32 octet_count); +void            rtp_source_process_rb     (RTPSource *src, guint8 fractionlost, gint32 packetslost, +                                           guint32 exthighestseq, guint32 jitter, +                                           guint32 lsr, guint32 dlsr); + +#endif /* __RTP_SOURCE_H__ */ diff --git a/gst/rtpmanager/rtpstats.c b/gst/rtpmanager/rtpstats.c new file mode 100644 index 00000000..b9076eac --- /dev/null +++ b/gst/rtpmanager/rtpstats.c @@ -0,0 +1,111 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include "rtpstats.h" + +/** + * rtp_stats_init_defaults: + * @stats: an #RTPSessionStats struct + * + * Initialize @stats with its default values. + */ +void +rtp_stats_init_defaults (RTPSessionStats * stats) +{ +  stats->bandwidth = RTP_STATS_BANDWIDTH; +  stats->sender_fraction = RTP_STATS_SENDER_FRACTION; +  stats->receiver_fraction = RTP_STATS_RECEIVER_FRACTION; +  stats->rtcp_bandwidth = RTP_STATS_RTCP_BANDWIDTH; +  stats->min_interval = RTP_STATS_MIN_INTERVAL; +} + +/** + * rtp_stats_calculate_rtcp_interval: + * @stats: an #RTPSessionStats struct + *  + * Calculate the RTCP interval. The result of this function is the amount of + * time to wait (in seconds) before sender a new RTCP message. + * + * Returns: the RTCP interval. + */ +gdouble +rtp_stats_calculate_rtcp_interval (RTPSessionStats * stats, gboolean sender) +{ +  gdouble active, senders, receivers, sfraction; +  gboolean avg_rtcp; +  gdouble interval; + +  active = stats->active_sources; +  /* Try to avoid division by zero */ +  if (stats->active_sources == 0) +    active += 1.0; + +  senders = (gdouble) stats->sender_sources; +  receivers = (gdouble) (active - senders); +  avg_rtcp = (gdouble) stats->avg_rtcp_packet_size; + +  sfraction = senders / active; + +  GST_DEBUG ("senders: %f, receivers %f, avg_rtcp %f, sfraction %f", +      senders, receivers, avg_rtcp, sfraction); + +  if (sfraction <= stats->sender_fraction) { +    if (sender) { +      interval = +          (avg_rtcp * senders) / (stats->sender_fraction * +          stats->rtcp_bandwidth); +    } else { +      interval = +          (avg_rtcp * receivers) / ((1.0 - +              stats->sender_fraction) * stats->rtcp_bandwidth); +    } +  } else { +    interval = (avg_rtcp * active) / stats->rtcp_bandwidth; +  } + +  if (interval < stats->min_interval) +    interval = stats->min_interval; + +  if (!stats->sent_rtcp) +    interval /= 2.0; + +  return interval; +} + +/** + * rtp_stats_calculate_rtcp_interval: + * @stats: an #RTPSessionStats struct + * @interval: an RTCP interval + *  + * Apply a random jitter to the @interval. @interval is typically obtained with + * rtp_stats_calculate_rtcp_interval(). + * + * Returns: the new RTCP interval. + */ +gdouble +rtp_stats_add_rtcp_jitter (RTPSessionStats * stats, gdouble interval) +{ +  /* see RFC 3550 p 30  +   * To compensate for "unconditional reconsideration" converging to a +   * value below the intended average. +   */ +#define COMPENSATION  (2.71828 - 1.5); + +  return (interval * g_random_double_range (0.5, 1.5)) / COMPENSATION; +} diff --git a/gst/rtpmanager/rtpstats.h b/gst/rtpmanager/rtpstats.h new file mode 100644 index 00000000..66aa7bf7 --- /dev/null +++ b/gst/rtpmanager/rtpstats.h @@ -0,0 +1,161 @@ +/* GStreamer + * Copyright (C) <2007> Wim Taymans <wim@fluendo.com> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#ifndef __RTP_STATS_H__ +#define __RTP_STATS_H__ + +#include <gst/gst.h> +#include <gst/netbuffer/gstnetbuffer.h> + +/** + * RTPSenderReport: + * + * A sender report structure. + */ +typedef struct { +  gboolean is_valid; +  guint64 ntptime; +  guint32 rtptime; +  guint32 packet_count; +  guint32 octet_count; +} RTPSenderReport; + +/** + * RTPReceiverReport: + * + * A receiver report structure. + */ +typedef struct { +  gboolean is_valid; +  guint32 ssrc; /* who the report is from */ +  guint8  fractionlost; +  guint32 packetslost; +  guint32 exthighestseq; +  guint32 jitter; +  guint32 lsr; +  guint32 dlsr; +} RTPReceiverReport; + +/** + * RTPArrivalStats: + * @time: arrival time of a packet + * @address: address of the sender of the packet + * @bytes: bytes of the packet including lowlevel overhead + * @payload_len: bytes of the RTP payload + * + * Structure holding information about the arrival stats of a packet. + */ +typedef struct { +  GstClockTime  time; +  gboolean      have_address; +  GstNetAddress address; +  guint         bytes; +  guint         payload_len; +} RTPArrivalStats; + +/** + * RTPSourceStats: + * @packetsreceived: number of received packets in total + * @prevpacketsreceived: number of packets received in previous reporting + *                       interval + * @octetsreceived: number of payload bytes received + * @bytesreceived: number of total bytes received including headers and lower + *                 protocol level overhead + * @max_seqnr: highest sequence number received + * @transit: previous transit time used for calculating @jitter + * @jitter: current jitter + * @prev_rtptime: previous time when an RTP packet was received + * @prev_rtcptime: previous time when an RTCP packet was received + * @last_rtptime: time when last RTP packet received + * @last_rtcptime: time when last RTCP packet received + * @curr_rr: index of current @rr block + * @rr: previous and current receiver report block + * @curr_sr: index of current @sr block + * @sr: previous and current sender report block + * + * Stats about a source. + */ +typedef struct { +  guint64      packetsreceived; +  guint64      prevpacketsreceived; +  guint64      octetsreceived; +  guint64      bytesreceived; +  guint16      max_seqnr; +  guint32      transit; +  guint32      jitter; + +  /* when we received stuff */ +  GstClockTime prev_rtptime; +  GstClockTime prev_rtcptime; +  GstClockTime last_rtptime; +  GstClockTime last_rtcptime; + +  /* sender and receiver reports */ +  gint              curr_rr; +  RTPReceiverReport rr[2]; +  gint              curr_sr; +  RTPSenderReport   sr[2]; +} RTPSourceStats; + +#define RTP_STATS_BANDWIDTH           64000.0 +#define RTP_STATS_RTCP_BANDWIDTH      3000.0 +/* + * Minimum average time between RTCP packets from this site (in + * seconds).  This time prevents the reports from `clumping' when + * sessions are small and the law of large numbers isn't helping + * to smooth out the traffic.  It also keeps the report interval + * from becoming ridiculously small during transient outages like + * a network partition. + */ +#define RTP_STATS_MIN_INTERVAL      5.0 + /* + * Fraction of the RTCP bandwidth to be shared among active + * senders.  (This fraction was chosen so that in a typical + * session with one or two active senders, the computed report + * time would be roughly equal to the minimum report time so that + * we don't unnecessarily slow down receiver reports.) The + * receiver fraction must be 1 - the sender fraction. + */ +#define RTP_STATS_SENDER_FRACTION       (0.25) +#define RTP_STATS_RECEIVER_FRACTION     (1.0 - RTP_STATS_SENDER_FRACTION) + +/** + * RTPSessionStats: + * + * Stats kept for a session and used to produce RTCP packet timeouts. + */ +typedef struct { +  gdouble       bandwidth; +  gdouble       sender_fraction; +  gdouble       receiver_fraction; +  gdouble       rtcp_bandwidth; +  gdouble       min_interval; +  guint         sender_sources; +  guint         active_sources; +  guint         avg_rtcp_packet_size; +  guint         avg_bye_packet_size; +  gboolean      sent_rtcp; +} RTPSessionStats; + +void           rtp_stats_init_defaults               (RTPSessionStats *stats); + +gdouble        rtp_stats_calculate_rtcp_interval    (RTPSessionStats *stats, gboolean sender); +gdouble        rtp_stats_add_rtcp_jitter            (RTPSessionStats *stats, gdouble interval); + +#endif /* __RTP_STATS_H__ */  | 
