From 779f67adc41f4ee99e9e4a2cd84cee2bef46e88a Mon Sep 17 00:00:00 2001 From: Branko Subasic Date: Fri, 19 Jun 2009 19:09:19 +0200 Subject: rtpbin: add support for buffer-list Add support for sending buffer-lists. Add unit test for testing that the buffer-list passed through rtpbin. fixes #585839 --- gst/rtpmanager/gstrtpsession.c | 62 ++++-- gst/rtpmanager/rtpsession.c | 37 ++-- gst/rtpmanager/rtpsession.h | 4 +- gst/rtpmanager/rtpsource.c | 118 ++++++++--- gst/rtpmanager/rtpsource.h | 2 +- tests/check/elements/rtpbin_buffer_list.c | 331 ++++++++++++++++++++++++++++++ 6 files changed, 492 insertions(+), 62 deletions(-) create mode 100644 tests/check/elements/rtpbin_buffer_list.c diff --git a/gst/rtpmanager/gstrtpsession.c b/gst/rtpmanager/gstrtpsession.c index c33fdfc6..9407ee52 100644 --- a/gst/rtpmanager/gstrtpsession.c +++ b/gst/rtpmanager/gstrtpsession.c @@ -259,7 +259,7 @@ struct _GstRtpSessionPrivate static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, - RTPSource * src, GstBuffer * buffer, gpointer user_data); + RTPSource * src, gpointer data, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gboolean eos, gpointer user_data); static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, @@ -1032,8 +1032,8 @@ gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession) g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL); } -/* called when the session manager has an RTP packet ready for further - * processing */ +/* called when the session manager has an RTP packet or a list of packets + * ready for further processing */ static GstFlowReturn gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, GstBuffer * buffer, gpointer user_data) @@ -1060,7 +1060,7 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src, * sending */ static GstFlowReturn gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, - GstBuffer * buffer, gpointer user_data) + gpointer data, gpointer user_data) { GstFlowReturn result; GstRtpSession *rtpsession; @@ -1069,12 +1069,17 @@ gst_rtp_session_send_rtp (RTPSession * sess, RTPSource * src, rtpsession = GST_RTP_SESSION (user_data); priv = rtpsession->priv; - GST_LOG_OBJECT (rtpsession, "sending RTP packet"); - if (rtpsession->send_rtp_src) { - result = gst_pad_push (rtpsession->send_rtp_src, buffer); + if (GST_IS_BUFFER (data)) { + GST_LOG_OBJECT (rtpsession, "sending RTP packet"); + result = gst_pad_push (rtpsession->send_rtp_src, GST_BUFFER_CAST (data)); + } else { + GST_LOG_OBJECT (rtpsession, "sending RTP list"); + result = gst_pad_push_list (rtpsession->send_rtp_src, + GST_BUFFER_LIST_CAST (data)); + } } else { - gst_buffer_unref (buffer); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); result = GST_FLOW_OK; } return result; @@ -1642,11 +1647,12 @@ gst_rtp_session_setcaps_send_rtp (GstPad * pad, GstCaps * caps) return TRUE; } -/* Recieve an RTP packet to be send to the receivers, send to RTP session - * manager and forward to send_rtp_src. +/* Recieve an RTP packet or a list of packets to be send to the receivers, + * send to RTP session manager and forward to send_rtp_src. */ static GstFlowReturn -gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) +gst_rtp_session_chain_send_rtp_common (GstPad * pad, gpointer data, + gboolean is_list) { GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; @@ -1658,10 +1664,22 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); priv = rtpsession->priv; - GST_LOG_OBJECT (rtpsession, "received RTP packet"); + GST_LOG_OBJECT (rtpsession, "received RTP %s", is_list ? "list" : "packet"); /* get NTP time when this packet was captured, this depends on the timestamp. */ - timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (is_list) { + GstBuffer *buffer = NULL; + + /* All groups in an list have the same timestamp. + * So, just take it from the first group. */ + buffer = gst_buffer_list_get (GST_BUFFER_LIST_CAST (data), 0, 0); + if (buffer) + timestamp = GST_BUFFER_TIMESTAMP (buffer); + else + timestamp = -1; + } else { + timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (data)); + } if (GST_CLOCK_TIME_IS_VALID (timestamp)) { /* convert to running time using the segment start value. */ ntpnstime = @@ -1676,7 +1694,9 @@ gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) } current_time = gst_clock_get_time (priv->sysclock); - ret = rtp_session_send_rtp (priv->session, buffer, current_time, ntpnstime); + ret = + rtp_session_send_rtp (priv->session, data, is_list, current_time, + ntpnstime); if (ret != GST_FLOW_OK) goto push_error; @@ -1694,6 +1714,18 @@ push_error: } } +static GstFlowReturn +gst_rtp_session_chain_send_rtp (GstPad * pad, GstBuffer * buffer) +{ + return gst_rtp_session_chain_send_rtp_common (pad, buffer, FALSE); +} + +static GstFlowReturn +gst_rtp_session_chain_send_rtp_list (GstPad * pad, GstBufferList * list) +{ + return gst_rtp_session_chain_send_rtp_common (pad, list, TRUE); +} + /* Create sinkpad to receive RTP packets from senders. This will also create a * srcpad for the RTP packets. */ @@ -1817,6 +1849,8 @@ create_send_rtp_sink (GstRtpSession * rtpsession) "send_rtp_sink"); gst_pad_set_chain_function (rtpsession->send_rtp_sink, gst_rtp_session_chain_send_rtp); + gst_pad_set_chain_list_function (rtpsession->send_rtp_sink, + gst_rtp_session_chain_send_rtp_list); gst_pad_set_getcaps_function (rtpsession->send_rtp_sink, gst_rtp_session_getcaps_send_rtp); gst_pad_set_setcaps_function (rtpsession->send_rtp_sink, diff --git a/gst/rtpmanager/rtpsession.c b/gst/rtpmanager/rtpsession.c index 219aacf1..cda04182 100644 --- a/gst/rtpmanager/rtpsession.c +++ b/gst/rtpmanager/rtpsession.c @@ -958,7 +958,7 @@ rtp_session_get_sdes_string (RTPSession * sess, GstRTCPSDESType type) } static GstFlowReturn -source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) +source_push_rtp (RTPSource * source, gpointer data, RTPSession * session) { GstFlowReturn result = GST_FLOW_OK; @@ -969,21 +969,21 @@ source_push_rtp (RTPSource * source, GstBuffer * buffer, RTPSession * session) if (session->callbacks.send_rtp) result = - session->callbacks.send_rtp (session, source, buffer, + session->callbacks.send_rtp (session, source, data, session->send_rtp_user_data); - else - gst_buffer_unref (buffer); - + else { + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + } } else { GST_LOG ("source %08x pushed receiver RTP packet", source->ssrc); RTP_SESSION_UNLOCK (session); if (session->callbacks.process_rtp) result = - session->callbacks.process_rtp (session, source, buffer, - session->process_rtp_user_data); + session->callbacks.process_rtp (session, source, + GST_BUFFER_CAST (data), session->process_rtp_user_data); else - gst_buffer_unref (buffer); + gst_buffer_unref (GST_BUFFER_CAST (data)); } RTP_SESSION_LOCK (session); @@ -1962,7 +1962,7 @@ ignore: /** * rtp_session_send_rtp: * @sess: an #RTPSession - * @buffer: an RTP buffer + * @data: pointer to either an RTP buffer or a list of RTP buffers * @current_time: the current system time * @ntpnstime: the NTP time in nanoseconds of when this buffer was captured. * This is the buffer timestamp converted to NTP time. @@ -1973,20 +1973,27 @@ ignore: * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, +rtp_session_send_rtp (RTPSession * sess, gpointer data, gboolean is_list, GstClockTime current_time, guint64 ntpnstime) { GstFlowReturn result; RTPSource *source; gboolean prevsender; + gboolean valid_packet; g_return_val_if_fail (RTP_IS_SESSION (sess), GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR); - if (!gst_rtp_buffer_validate (buffer)) + if (is_list) { + valid_packet = gst_rtp_buffer_list_validate (GST_BUFFER_LIST_CAST (data)); + } else { + valid_packet = gst_rtp_buffer_validate (GST_BUFFER_CAST (data)); + } + + if (!valid_packet) goto invalid_packet; - GST_LOG ("received RTP packet for sending"); + GST_LOG ("received RTP %s for sending", is_list ? "list" : "packet"); RTP_SESSION_LOCK (sess); source = sess->source; @@ -1997,7 +2004,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, prevsender = RTP_SOURCE_IS_SENDER (source); /* we use our own source to send */ - result = rtp_source_send_rtp (source, buffer, ntpnstime); + result = rtp_source_send_rtp (source, data, is_list, ntpnstime); if (RTP_SOURCE_IS_SENDER (source) && !prevsender) sess->stats.sender_sources++; @@ -2008,7 +2015,7 @@ rtp_session_send_rtp (RTPSession * sess, GstBuffer * buffer, /* ERRORS */ invalid_packet: { - gst_buffer_unref (buffer); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); GST_DEBUG ("invalid RTP packet received"); return GST_FLOW_OK; } diff --git a/gst/rtpmanager/rtpsession.h b/gst/rtpmanager/rtpsession.h index 7e327a7d..6312f1c1 100644 --- a/gst/rtpmanager/rtpsession.h +++ b/gst/rtpmanager/rtpsession.h @@ -65,7 +65,7 @@ typedef GstFlowReturn (*RTPSessionProcessRTP) (RTPSession *sess, RTPSource *src, * * Returns: a #GstFlowReturn. */ -typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, GstBuffer *buffer, gpointer user_data); +typedef GstFlowReturn (*RTPSessionSendRTP) (RTPSession *sess, RTPSource *src, gpointer data, gpointer user_data); /** * RTPSessionSendRTCP: @@ -288,7 +288,7 @@ GstFlowReturn rtp_session_process_rtcp (RTPSession *sess, GstBuffer GstClockTime current_time); /* processing packets for sending */ -GstFlowReturn rtp_session_send_rtp (RTPSession *sess, GstBuffer *buffer, +GstFlowReturn rtp_session_send_rtp (RTPSession *sess, gpointer data, gboolean is_list, GstClockTime current_time, guint64 ntpnstime); /* stopping the session */ diff --git a/gst/rtpmanager/rtpsource.c b/gst/rtpmanager/rtpsource.c index ed080717..209c17b5 100644 --- a/gst/rtpmanager/rtpsource.c +++ b/gst/rtpmanager/rtpsource.c @@ -26,7 +26,7 @@ GST_DEBUG_CATEGORY_STATIC (rtp_source_debug); #define GST_CAT_DEFAULT rtp_source_debug -#define RTP_MAX_PROBATION_LEN 32 +#define RTP_MAX_PROBATION_LEN 32 /* signals and args */ enum @@ -1091,41 +1091,73 @@ rtp_source_process_bye (RTPSource * src, const gchar * reason) src->received_bye = TRUE; } +static GstBufferListItem +set_ssrc (GstBuffer ** buffer, guint group, guint idx, RTPSource * src) +{ + *buffer = gst_buffer_make_writable (*buffer); + gst_rtp_buffer_set_ssrc (*buffer, src->ssrc); + return GST_BUFFER_LIST_SKIP_GROUP; +} + /** * rtp_source_send_rtp: * @src: an #RTPSource - * @buffer: an RTP buffer + * @data: an RTP buffer or a list of RTP buffers + * @is_list: if @data is a buffer or list * @ntpnstime: the NTP time when this buffer was captured in nanoseconds. This * is the buffer timestamp converted to NTP time. * - * Send an RTP @buffer originating from @src. This will make @src a sender. - * This function takes ownership of @buffer and modifies the SSRC in the RTP - * packet to that of @src when needed. + * Send @data (an RTP buffer or list of buffers) originating from @src. + * This will make @src a sender. This function takes ownership of @data and + * modifies the SSRC in the RTP packet to that of @src when needed. * * Returns: a #GstFlowReturn. */ GstFlowReturn -rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) +rtp_source_send_rtp (RTPSource * src, gpointer data, gboolean is_list, + guint64 ntpnstime) { - GstFlowReturn result = GST_FLOW_OK; + GstFlowReturn result; guint len; guint32 rtptime; guint64 ext_rtptime; guint64 ntp_diff, rtp_diff; guint64 elapsed; + GstBufferList *list = NULL; + GstBuffer *buffer = NULL; + guint packets; + guint32 ssrc; g_return_val_if_fail (RTP_IS_SOURCE (src), GST_FLOW_ERROR); - g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR); + g_return_val_if_fail (is_list || GST_IS_BUFFER (data), GST_FLOW_ERROR); - len = gst_rtp_buffer_get_payload_len (buffer); + if (is_list) { + list = GST_BUFFER_LIST_CAST (data); + /* We can grab the caps from the first group, since all + * groups of a buffer list have same caps. */ + buffer = gst_buffer_list_get (list, 0, 0); + if (!buffer) + goto no_buffer; + } else { + buffer = GST_BUFFER_CAST (data); + } rtp_source_update_caps (src, GST_BUFFER_CAPS (buffer)); /* we are a sender now */ src->is_sender = TRUE; + if (is_list) { + /* Each group makes up a network packet. */ + packets = gst_buffer_list_n_groups (list); + len = gst_rtp_buffer_list_get_payload_len (list); + } else { + packets = 1; + len = gst_rtp_buffer_get_payload_len (buffer); + } + /* update stats for the SR */ - src->stats.packets_sent++; + src->stats.packets_sent += packets; src->stats.octets_sent += len; src->bytes_sent += len; @@ -1156,7 +1188,11 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) src->bitrate = 0; } - rtptime = gst_rtp_buffer_get_timestamp (buffer); + if (is_list) { + rtptime = gst_rtp_buffer_list_get_timestamp (list); + } else { + rtptime = gst_rtp_buffer_get_timestamp (buffer); + } ext_rtptime = src->last_rtptime; ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime); @@ -1180,31 +1216,53 @@ rtp_source_send_rtp (RTPSource * src, GstBuffer * buffer, guint64 ntpnstime) src->last_ntpnstime = ntpnstime; /* push packet */ - if (src->callbacks.push_rtp) { - guint32 ssrc; + if (!src->callbacks.push_rtp) + goto no_callback; + if (is_list) { + ssrc = gst_rtp_buffer_list_get_ssrc (list); + } else { ssrc = gst_rtp_buffer_get_ssrc (buffer); - if (ssrc != src->ssrc) { - /* the SSRC of the packet is not correct, make a writable buffer and - * update the SSRC. This could involve a complete copy of the packet when - * it is not writable. Usually the payloader will use caps negotiation to - * get the correct SSRC from the session manager before pushing anything. */ - buffer = gst_buffer_make_writable (buffer); - - /* FIXME, we don't want to warn yet because we can't inform any payloader - * of the changes SSRC yet because we don't implement pad-alloc. */ - GST_LOG ("updating SSRC from %08x to %08x, fix the payloader", ssrc, - src->ssrc); - gst_rtp_buffer_set_ssrc (buffer, src->ssrc); + } + + if (ssrc != src->ssrc) { + /* the SSRC of the packet is not correct, make a writable buffer and + * update the SSRC. This could involve a complete copy of the packet when + * it is not writable. Usually the payloader will use caps negotiation to + * get the correct SSRC from the session manager before pushing anything. */ + + /* FIXME, we don't want to warn yet because we can't inform any payloader + * of the changes SSRC yet because we don't implement pad-alloc. */ + GST_LOG ("updating SSRC from %08x to %08x, fix the payloader", ssrc, + src->ssrc); + + if (is_list) { + list = gst_buffer_list_make_writable (list); + gst_buffer_list_foreach (list, (GstBufferListFunc) set_ssrc, src); + } else { + set_ssrc (&buffer, 0, 0, src); } - GST_LOG ("pushing RTP packet %" G_GUINT64_FORMAT, src->stats.packets_sent); - result = src->callbacks.push_rtp (src, buffer, src->user_data); - } else { - GST_WARNING ("no callback installed, dropping packet"); - gst_buffer_unref (buffer); } + GST_LOG ("pushing RTP %s %" G_GUINT64_FORMAT, is_list ? "list" : "packet", + src->stats.packets_sent); + + result = src->callbacks.push_rtp (src, data, src->user_data); return result; + + /* ERRORS */ +no_buffer: + { + GST_WARNING ("no buffers in buffer list"); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + return GST_FLOW_OK; + } +no_callback: + { + GST_WARNING ("no callback installed, dropping packet"); + gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); + return GST_FLOW_OK; + } } /** diff --git a/gst/rtpmanager/rtpsource.h b/gst/rtpmanager/rtpsource.h index a44ac1cd..8286f2ec 100644 --- a/gst/rtpmanager/rtpsource.h +++ b/gst/rtpmanager/rtpsource.h @@ -194,7 +194,7 @@ void rtp_source_set_rtcp_from (RTPSource *src, GstNetAddress *a /* handling RTP */ GstFlowReturn rtp_source_process_rtp (RTPSource *src, GstBuffer *buffer, RTPArrivalStats *arrival); -GstFlowReturn rtp_source_send_rtp (RTPSource *src, GstBuffer *buffer, guint64 ntpnstime); +GstFlowReturn rtp_source_send_rtp (RTPSource *src, gpointer data, gboolean is_list, guint64 ntpnstime); /* RTCP messages */ void rtp_source_process_bye (RTPSource *src, const gchar *reason); diff --git a/tests/check/elements/rtpbin_buffer_list.c b/tests/check/elements/rtpbin_buffer_list.c new file mode 100644 index 00000000..af4003dd --- /dev/null +++ b/tests/check/elements/rtpbin_buffer_list.c @@ -0,0 +1,331 @@ +/* GStreamer + * + * Unit test for gstrtpbin sending rtp packets using GstBufferList. + * Copyright (C) 2009 Branko Subasic + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include + +#include + + + +/* This test makes sure that RTP packets sent as buffer lists are sent through + * the rtpbin as they are supposed to, and not corrupted in any way. + */ + + +#define TEST_CAPS \ + "application/x-rtp, " \ + "media=(string)video, " \ + "clock-rate=(int)90000, " \ + "encoding-name=(string)H264, " \ + "profile-level-id=(string)4d4015, " \ + "payload=(int)96, " \ + "ssrc=(guint)2633237432, " \ + "clock-base=(guint)1868267015, " \ + "seqnum-base=(guint)54229" + + +/* RTP headers and the first 2 bytes of the payload (FU indicator and FU header) + */ +static const guint8 rtp_header[2][14] = { + {0x80, 0x60, 0xbb, 0xb7, 0x5c, 0xe9, 0x09, + 0x0d, 0xf5, 0x9c, 0x43, 0x55, 0x1c, 0x86}, + {0x80, 0x60, 0xbb, 0xb8, 0x5c, 0xe9, 0x09, + 0x0d, 0xf5, 0x9c, 0x43, 0x55, 0x1c, 0x46} +}; + +static const guint rtp_header_len[] = { + sizeof rtp_header[0], + sizeof rtp_header[1] +}; + +static GstBuffer *header_buffer[2] = { NULL, NULL }; + + +/* Some payload. + */ +static char *payload = + "0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF" + "0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF" + "0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF" + "0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF" + "0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF" + "0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF0123456789ABSDEF" + "0123456789ABSDEF0123456"; + +static const guint payload_offset[] = { + 0, 498 +}; + +static const guint payload_len[] = { + 498, 5 +}; + + +static GstBuffer *original_buffer = NULL; + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp")); + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("application/x-rtp")); + + +static GstBuffer * +_create_original_buffer (void) +{ + GstCaps *caps; + + if (original_buffer != NULL) + return original_buffer; + + original_buffer = gst_buffer_new (); + fail_unless (original_buffer != NULL); + + gst_buffer_set_data (original_buffer, (guint8 *) payload, strlen (payload)); + GST_BUFFER_TIMESTAMP (original_buffer) = + gst_clock_get_internal_time (gst_system_clock_obtain ()); + + caps = gst_caps_from_string (TEST_CAPS); + fail_unless (caps != NULL); + gst_buffer_set_caps (original_buffer, caps); + gst_caps_unref (caps); + + return original_buffer; +} + +static GstBufferList * +_create_buffer_list (void) +{ + GstBufferList *list; + GstBufferListIterator *it; + GstBuffer *orig_buffer; + GstBuffer *buffer; + + orig_buffer = _create_original_buffer (); + fail_if (orig_buffer == NULL); + + list = gst_buffer_list_new (); + fail_if (list == NULL); + + it = gst_buffer_list_iterate (list); + fail_if (it == NULL); + + /*** First group, i.e. first packet. **/ + gst_buffer_list_iterator_add_group (it); + + /* Create buffer with RTP header and add it to the 1st group */ + buffer = gst_buffer_new (); + GST_BUFFER_MALLOCDATA (buffer) = g_memdup (&rtp_header[0], rtp_header_len[0]); + GST_BUFFER_DATA (buffer) = GST_BUFFER_MALLOCDATA (buffer); + GST_BUFFER_SIZE (buffer) = rtp_header_len[0]; + gst_buffer_copy_metadata (buffer, orig_buffer, GST_BUFFER_COPY_ALL); + header_buffer[0] = buffer; + gst_buffer_list_iterator_add (it, buffer); + + /* Create the payload buffer and add it to the 1st group + */ + buffer = + gst_buffer_create_sub (orig_buffer, payload_offset[0], payload_len[0]); + fail_if (buffer == NULL); + gst_buffer_list_iterator_add (it, buffer); + + + /*** Second group, i.e. second packet. ***/ + + /* Create a new group to hold the rtp header and the payload */ + gst_buffer_list_iterator_add_group (it); + + /* Create buffer with RTP header and add it to the 2nd group */ + buffer = gst_buffer_new (); + GST_BUFFER_MALLOCDATA (buffer) = g_memdup (&rtp_header[1], rtp_header_len[1]); + GST_BUFFER_DATA (buffer) = GST_BUFFER_MALLOCDATA (buffer); + GST_BUFFER_SIZE (buffer) = rtp_header_len[1]; + gst_buffer_copy_metadata (buffer, orig_buffer, GST_BUFFER_COPY_ALL); + header_buffer[1] = buffer; + + /* Add the rtp header to the buffer list */ + gst_buffer_list_iterator_add (it, buffer); + + /* Create the payload buffer and add it to the 2d group + */ + buffer = + gst_buffer_create_sub (orig_buffer, payload_offset[1], payload_len[1]); + fail_if (buffer == NULL); + gst_buffer_list_iterator_add (it, buffer); + + gst_buffer_list_iterator_free (it); + + return list; +} + + +static void +_check_header (GstBuffer * buffer, guint index) +{ + guint8 *data; + + fail_if (buffer == NULL); + fail_unless (index < 2); + + fail_unless (GST_BUFFER_SIZE (buffer) == rtp_header_len[index]); + + /* Can't do a memcmp() on the whole header, cause the SSRC (bytes 8-11) will + * most likely be changed in gstrtpbin. + */ + fail_unless ((data = GST_BUFFER_DATA (buffer)) != NULL); + fail_unless_equals_uint64 (*(guint64 *) data, *(guint64 *) rtp_header[index]); + fail_unless (*(guint16 *) (data + 12) == + *(guint16 *) (rtp_header[index] + 12)); +} + + +static void +_check_payload (GstBuffer * buffer, guint index) +{ + fail_if (buffer == NULL); + fail_unless (index < 2); + + fail_unless (GST_BUFFER_SIZE (buffer) == payload_len[index]); + fail_if (GST_BUFFER_DATA (buffer) != + (gpointer) (payload + payload_offset[index])); + fail_if (memcmp (GST_BUFFER_DATA (buffer), payload + payload_offset[index], + payload_len[index])); +} + + +static void +_check_group (GstBufferListIterator * it, guint index, GstCaps * caps) +{ + GstBuffer *buffer; + + fail_unless (it != NULL); + fail_unless (gst_buffer_list_iterator_n_buffers (it) == 2); + fail_unless (caps != NULL); + + fail_unless ((buffer = gst_buffer_list_iterator_next (it)) != NULL); + + fail_unless (GST_BUFFER_TIMESTAMP (buffer) == + GST_BUFFER_TIMESTAMP (original_buffer)); + + fail_unless (gst_caps_is_equal (GST_BUFFER_CAPS (original_buffer), + GST_BUFFER_CAPS (buffer))); + + _check_header (buffer, index); + + fail_unless ((buffer = gst_buffer_list_iterator_next (it)) != NULL); + _check_payload (buffer, index); +} + + +static GstFlowReturn +_sink_chain_list (GstPad * pad, GstBufferList * list) +{ + GstCaps *caps; + GstBufferListIterator *it; + + caps = gst_caps_from_string (TEST_CAPS); + fail_unless (caps != NULL); + + fail_unless (GST_IS_BUFFER_LIST (list)); + fail_unless (gst_buffer_list_n_groups (list) == 2); + + it = gst_buffer_list_iterate (list); + fail_if (it == NULL); + + fail_unless (gst_buffer_list_iterator_next_group (it)); + _check_group (it, 0, caps); + + fail_unless (gst_buffer_list_iterator_next_group (it)); + _check_group (it, 1, caps); + + gst_caps_unref (caps); + gst_buffer_list_iterator_free (it); + + gst_buffer_list_unref (list); + + return GST_FLOW_OK; +} + + +static void +_set_chain_functions (GstPad * pad) +{ + gst_pad_set_chain_list_function (pad, _sink_chain_list); +} + + +GST_START_TEST (test_bufferlist) +{ + GstElement *rtpbin; + GstPad *sinkpad; + GstPad *srcpad; + GstBufferList *list; + + list = _create_buffer_list (); + fail_unless (list != NULL); + + rtpbin = gst_check_setup_element ("gstrtpbin"); + + srcpad = + gst_check_setup_src_pad_by_name (rtpbin, &srctemplate, "send_rtp_sink_0"); + fail_if (srcpad == NULL); + sinkpad = + gst_check_setup_sink_pad_by_name (rtpbin, &sinktemplate, + "send_rtp_src_0"); + fail_if (sinkpad == NULL); + + _set_chain_functions (sinkpad); + + gst_pad_set_active (sinkpad, TRUE); + gst_element_set_state (rtpbin, GST_STATE_PLAYING); + fail_unless (gst_pad_push_list (srcpad, list) == GST_FLOW_OK); + gst_pad_set_active (sinkpad, FALSE); + + gst_check_teardown_pad_by_name (rtpbin, "send_rtp_src_0"); + gst_check_teardown_pad_by_name (rtpbin, "send_rtp_sink_0"); + gst_check_teardown_element (rtpbin); +} + +GST_END_TEST; + + + +static Suite * +bufferlist_suite (void) +{ + Suite *s = suite_create ("BufferList"); + + TCase *tc_chain = tcase_create ("general"); + + /* time out after 30s. */ + tcase_set_timeout (tc_chain, 10); + + suite_add_tcase (s, tc_chain); + tcase_add_test (tc_chain, test_bufferlist); + + return s; +} + +GST_CHECK_MAIN (bufferlist); -- cgit