diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2006-08-16 09:48:26 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2006-08-16 09:48:26 +0000 |
commit | 6eedcfbc8c493079db829b5f67ccd1bd791c1313 (patch) | |
tree | 698ae29c418336a8acc2ed4c8262690e9eb75510 /gst/rtsp | |
parent | 64faced49c74be6cfb7c8b215e7e782dec6ed5ee (diff) |
gst/rtsp/gstrtpdec.c: Add pads after setting them up.
Original commit message from CVS:
* gst/rtsp/gstrtpdec.c: (gst_rtpdec_init), (gst_rtpdec_getcaps):
Add pads after setting them up.
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_class_init),
(gst_rtspsrc_init), (gst_rtspsrc_finalize),
(gst_rtspsrc_free_stream), (gst_rtspsrc_media_to_caps),
(gst_rtspsrc_stream_setup_rtp),
(gst_rtspsrc_stream_configure_transport),
(gst_rtspsrc_combine_flows), (gst_rtspsrc_loop),
(gst_rtspsrc_open), (gst_rtspsrc_close), (gst_rtspsrc_play),
(gst_rtspsrc_pause):
* gst/rtsp/gstrtspsrc.h:
Fix interleaved mode.
- Protect streaming with lock.
- Combine flows
- set caps on outgoing buffers.
- strip trailing \0 from data packets.
- Configure RTP/RTCP in stream.
Use DEBUG_OBJECT more.
Diffstat (limited to 'gst/rtsp')
-rw-r--r-- | gst/rtsp/gstrtpdec.c | 6 | ||||
-rw-r--r-- | gst/rtsp/gstrtspsrc.c | 230 | ||||
-rw-r--r-- | gst/rtsp/gstrtspsrc.h | 38 |
3 files changed, 205 insertions, 69 deletions
diff --git a/gst/rtsp/gstrtpdec.c b/gst/rtsp/gstrtpdec.c index 24054d83..a8d6dddd 100644 --- a/gst/rtsp/gstrtpdec.c +++ b/gst/rtsp/gstrtpdec.c @@ -170,16 +170,16 @@ gst_rtpdec_init (GstRTPDec * rtpdec) rtpdec->sink_rtp = gst_pad_new_from_static_template (&gst_rtpdec_sink_rtp_template, "sinkrtp"); - gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtp); gst_pad_set_getcaps_function (rtpdec->sink_rtp, gst_rtpdec_getcaps); gst_pad_set_chain_function (rtpdec->sink_rtp, gst_rtpdec_chain_rtp); + gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtp); /* the input rtcp pad */ rtpdec->sink_rtcp = gst_pad_new_from_static_template (&gst_rtpdec_sink_rtcp_template, "sinkrtcp"); - gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtcp); gst_pad_set_chain_function (rtpdec->sink_rtcp, gst_rtpdec_chain_rtcp); + gst_element_add_pad (GST_ELEMENT (rtpdec), rtpdec->sink_rtcp); /* the output rtp pad */ rtpdec->src_rtp = @@ -203,7 +203,7 @@ gst_rtpdec_getcaps (GstPad * pad) src = GST_RTPDEC (GST_PAD_PARENT (pad)); - other = pad == src->src_rtp ? src->sink_rtp : src->src_rtp; + other = (pad == src->src_rtp ? src->sink_rtp : src->src_rtp); caps = gst_pad_peer_get_caps (other); diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index fd2c476e..10bbedab 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -135,6 +135,7 @@ gst_rtsp_proto_get_type (void) static void gst_rtspsrc_base_init (gpointer g_class); static void gst_rtspsrc_class_init (GstRTSPSrc * klass); static void gst_rtspsrc_init (GstRTSPSrc * rtspsrc); +static void gst_rtspsrc_finalize (GObject * object); static void gst_rtspsrc_uri_handler_init (gpointer g_iface, gpointer iface_data); @@ -216,6 +217,8 @@ gst_rtspsrc_class_init (GstRTSPSrc * klass) gobject_class->set_property = gst_rtspsrc_set_property; gobject_class->get_property = gst_rtspsrc_get_property; + gobject_class->finalize = gst_rtspsrc_finalize; + g_object_class_install_property (gobject_class, PROP_LOCATION, g_param_spec_string ("location", "RTSP Location", "Location of the RTSP url to read", @@ -243,6 +246,21 @@ gst_rtspsrc_class_init (GstRTSPSrc * klass) static void gst_rtspsrc_init (GstRTSPSrc * src) { + src->stream_rec_lock = g_new (GStaticRecMutex, 1); + g_static_rec_mutex_init (src->stream_rec_lock); +} + +static void +gst_rtspsrc_finalize (GObject * object) +{ + GstRTSPSrc *rtspsrc; + + rtspsrc = GST_RTSPSRC (object); + + g_static_rec_mutex_free (rtspsrc->stream_rec_lock); + g_free (rtspsrc->stream_rec_lock); + + G_OBJECT_CLASS (parent_class)->finalize (object); } static void @@ -314,6 +332,22 @@ gst_rtspsrc_create_stream (GstRTSPSrc * src) return s; } +#if 0 +static void +gst_rtspsrc_free_stream (GstRTSPSrc * src, GstRTSPStream * stream) +{ + if (stream->caps) { + gst_caps_unref (stream->caps); + stream->caps = NULL; + } + + src->streams = g_list_remove (src->streams, stream); + src->numstreams--; + + g_free (stream); +} +#endif + static gboolean gst_rtspsrc_add_element (GstRTSPSrc * src, GstElement * element) { @@ -437,7 +471,7 @@ gst_rtspsrc_parse_rtpmap (gchar * rtpmap, gint * payload, gchar ** name, * * m=<media> <udp port> RTP/AVP <payload> * a=rtpmap:<payload> <encoding_name>/<clock_rate>[/<encoding_params>] - * a=fmtp:<payload> <param>=<value>;... + * a=fmtp:<payload> <param>[=<value>];... */ static GstCaps * gst_rtspsrc_media_to_caps (SDPMedia * media) @@ -502,34 +536,37 @@ gst_rtspsrc_media_to_caps (SDPMedia * media) p = fmtp; + /* p is now of the format <payload> <param>[=<value>];... */ PARSE_INT (p, " ", payload); if (payload != -1 && payload == pt) { gchar **pairs; gint i; + /* <param>[=<value>] are separated with ';' */ pairs = g_strsplit (p, ";", 0); for (i = 0; pairs[i]; i++) { - gchar **keyval; - - keyval = g_strsplit (pairs[i], "=", 0); - if (keyval[0]) { - gchar *val, *key; - - if (keyval[1]) - val = g_strstrip (keyval[1]); - else - val = "1"; - - key = g_strstrip (keyval[0]); - - gst_structure_set (s, key, G_TYPE_STRING, val, NULL); + gchar *valpos; + gchar *val, *key; + + /* the key may not have a '=', the value can have other '='s */ + valpos = strstr (pairs[i], "="); + if (valpos) { + /* we have a '=' and thus a value, remove the '=' with \0 */ + *valpos = '\0'; + /* value is everything between '=' and ';' */ + val = g_strstrip (valpos + 1); + } else { + /* simple <param>;.. is translated into <param>=1;... */ + val = "1"; } - g_strfreev (keyval); + /* strip the key of spaces */ + key = g_strstrip (pairs[i]); + + gst_structure_set (s, key, G_TYPE_STRING, val, NULL); } g_strfreev (pairs); } } - return caps; } @@ -630,32 +667,33 @@ again: /* ERRORS */ no_udp_rtp_protocol: { - GST_DEBUG ("could not get UDP source for RTP"); + GST_DEBUG_OBJECT (src, "could not get UDP source for RTP"); goto cleanup; } start_rtp_failure: { - GST_DEBUG ("could not start UDP source for RTP"); + GST_DEBUG_OBJECT (src, "could not start UDP source for RTP"); goto cleanup; } no_ports: { - GST_DEBUG ("could not allocate UDP port pair after %d retries", count); + GST_DEBUG_OBJECT (src, "could not allocate UDP port pair after %d retries", + count); goto cleanup; } no_udp_rtcp_protocol: { - GST_DEBUG ("could not get UDP source for RTCP"); + GST_DEBUG_OBJECT (src, "could not get UDP source for RTCP"); goto cleanup; } start_rtcp_failure: { - GST_DEBUG ("could not start UDP source for RTCP"); + GST_DEBUG_OBJECT (src, "could not start UDP source for RTCP"); goto cleanup; } port_error: { - GST_DEBUG ("ports don't match rtp: %d<->%d, rtcp: %d<->%d", + GST_DEBUG_OBJECT (src, "ports don't match rtp: %d<->%d, rtcp: %d<->%d", tmp_rtp, *rtpport, tmp_rtcp, *rtcpport); goto cleanup; } @@ -679,7 +717,7 @@ cleanup: static gboolean gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream, - RTSPTransport * transport) + SDPMedia * media, RTSPTransport * transport) { GstRTSPSrc *src; GstPad *pad; @@ -688,6 +726,8 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream, src = stream->parent; + GST_DEBUG ("configuring RTP transport for stream %p", stream); + if (!(stream->rtpdec = gst_element_factory_make ("rtpdec", NULL))) goto no_element; @@ -706,6 +746,13 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream, /* configure for interleaved delivery, nothing needs to be done * here, the loop function will call the chain functions of the * rtp session manager. */ + stream->rtpchannel = transport->interleaved.min; + stream->rtcpchannel = transport->interleaved.max; + GST_DEBUG ("stream %p on channels %d-%d", stream, + stream->rtpchannel, stream->rtcpchannel); + + /* also store the caps in the stream */ + stream->caps = gst_rtspsrc_media_to_caps (media); } else { /* configure for UDP delivery, we need to connect the udp pads to * the rtp session plugin. */ @@ -719,6 +766,10 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream, } pad = gst_element_get_pad (stream->rtpdec, "srcrtp"); + if (stream->caps) { + gst_pad_use_fixed_caps (pad); + gst_pad_set_caps (pad, stream->caps); + } name = g_strdup_printf ("rtp_stream%d", stream->id); gst_element_add_pad (GST_ELEMENT_CAST (src), gst_ghost_pad_new (name, pad)); g_free (name); @@ -726,14 +777,15 @@ gst_rtspsrc_stream_configure_transport (GstRTSPStream * stream, return TRUE; + /* ERRORS */ no_element: { - GST_DEBUG ("no rtpdec element found"); + GST_DEBUG_OBJECT (src, "no rtpdec element found"); return FALSE; } start_rtpdec_failure: { - GST_DEBUG ("could not start RTP session"); + GST_DEBUG_OBJECT (src, "could not start RTP session"); return FALSE; } } @@ -749,6 +801,40 @@ find_stream (GstRTSPStream * stream, gconstpointer a) return -1; } +static GstFlowReturn +gst_rtspsrc_combine_flows (GstRTSPSrc * src, GstRTSPStream * stream, + GstFlowReturn ret) +{ + GList *streams; + + /* store the value */ + stream->last_ret = ret; + + /* if it's success we can return the value right away */ + if (GST_FLOW_IS_SUCCESS (ret)) + goto done; + + /* any other error that is not-linked can be returned right + * away */ + if (ret != GST_FLOW_NOT_LINKED) + goto done; + + /* only return NOT_LINKED if all other pads returned NOT_LINKED */ + for (streams = src->streams; streams; streams = g_list_next (streams)) { + GstRTSPStream *ostream = (GstRTSPStream *) streams->data; + + ret = ostream->last_ret; + /* some other return value (must be SUCCESS but we can return + * other values as well) */ + if (ret != GST_FLOW_NOT_LINKED) + goto done; + } + /* if we get here, all other pads were unlinked and we return + * NOT_LINKED then */ +done: + return ret; +} + static void gst_rtspsrc_loop (GstRTSPSrc * src) { @@ -760,12 +846,14 @@ gst_rtspsrc_loop (GstRTSPSrc * src) GstPad *outpad = NULL; guint8 *data; guint size; + GstFlowReturn ret = GST_FLOW_OK; + GstCaps *caps = NULL; do { - GST_DEBUG ("doing reveive"); + GST_DEBUG_OBJECT (src, "doing receive"); if ((res = rtsp_connection_receive (src->connection, &response)) < 0) goto receive_error; - GST_DEBUG ("got packet"); + GST_DEBUG_OBJECT (src, "got packet type %d", response.type); } while (response.type != RTSP_MESSAGE_DATA); @@ -777,10 +865,12 @@ gst_rtspsrc_loop (GstRTSPSrc * src) goto unknown_stream; stream = (GstRTSPStream *) lstream->data; - if (channel == stream->rtpchannel) + if (channel == stream->rtpchannel) { outpad = stream->rtpdecrtp; - else if (channel == stream->rtcpchannel) + caps = stream->caps; + } else if (channel == stream->rtcpchannel) { outpad = stream->rtpdecrtcp; + } rtsp_message_get_body (&response, &data, &size); @@ -798,21 +888,39 @@ gst_rtspsrc_loop (GstRTSPSrc * src) { GstBuffer *buf; + /* strip the trailing \0 */ + size -= 1; + buf = gst_buffer_new_and_alloc (size); memcpy (GST_BUFFER_DATA (buf), data, size); - if (gst_pad_chain (outpad, buf) != GST_FLOW_OK) - goto need_pause; - } + if (caps) + gst_buffer_set_caps (buf, caps); -unknown_stream: + GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size, + channel); + + /* chain to the peer pad */ + ret = gst_pad_chain (outpad, buf); + /* combine all streams */ + ret = gst_rtspsrc_combine_flows (src, stream, ret); + if (ret != GST_FLOW_OK) + goto need_pause; + } return; + /* ERRORS */ +unknown_stream: + { + GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel); + return; + } receive_error: { GST_ELEMENT_ERROR (src, RESOURCE, WRITE, ("Could not receive message."), (NULL)); + ret = GST_FLOW_UNEXPECTED; /* gst_pad_push_event (src->srcpad, gst_event_new (GST_EVENT_EOS)); */ @@ -820,6 +928,8 @@ receive_error: } need_pause: { + GST_DEBUG_OBJECT (src, "pausing task, reason %d (%s)", ret, + gst_flow_get_name (ret)); gst_task_pause (src->task); return; } @@ -886,24 +996,24 @@ gst_rtspsrc_open (GstRTSPSrc * src) GstRTSPProto protocols; /* parse url */ - GST_DEBUG ("parsing url..."); + GST_DEBUG_OBJECT (src, "parsing url..."); if ((res = rtsp_url_parse (src->location, &url)) < 0) goto invalid_url; /* open connection */ - GST_DEBUG ("opening connection..."); + GST_DEBUG_OBJECT (src, "opening connection..."); if ((res = rtsp_connection_open (url, &src->connection)) < 0) goto could_not_open; /* create OPTIONS */ - GST_DEBUG ("create options..."); + GST_DEBUG_OBJECT (src, "create options..."); if ((res = rtsp_message_init_request (RTSP_OPTIONS, src->location, &request)) < 0) goto create_request_failed; /* send OPTIONS */ - GST_DEBUG ("send options..."); + GST_DEBUG_OBJECT (src, "send options..."); if (!gst_rtspsrc_send (src, &request, &response, NULL)) goto send_error; @@ -955,7 +1065,7 @@ gst_rtspsrc_open (GstRTSPSrc * src) } /* create DESCRIBE */ - GST_DEBUG ("create describe..."); + GST_DEBUG_OBJECT (src, "create describe..."); if ((res = rtsp_message_init_request (RTSP_DESCRIBE, src->location, &request)) < 0) @@ -964,7 +1074,7 @@ gst_rtspsrc_open (GstRTSPSrc * src) rtsp_message_add_header (&request, RTSP_HDR_ACCEPT, "application/sdp"); /* send DESCRIBE */ - GST_DEBUG ("send describe..."); + GST_DEBUG_OBJECT (src, "send describe..."); if (!gst_rtspsrc_send (src, &request, &response, NULL)) goto send_error; @@ -984,7 +1094,7 @@ gst_rtspsrc_open (GstRTSPSrc * src) /* parse SDP */ rtsp_message_get_body (&response, &data, &size); - GST_DEBUG ("parse sdp..."); + GST_DEBUG_OBJECT (src, "parse sdp..."); sdp_message_init (&sdp); sdp_message_parse_buffer (data, size, &sdp); @@ -1008,10 +1118,10 @@ gst_rtspsrc_open (GstRTSPSrc * src) stream = gst_rtspsrc_create_stream (src); - GST_DEBUG ("setup media %d", i); + GST_DEBUG_OBJECT (src, "setup media %d", i); control_url = sdp_media_get_attribute_val (media, "control"); if (control_url == NULL) { - GST_DEBUG ("no control url found, skipping stream"); + GST_DEBUG_OBJECT (src, "no control url found, skipping stream"); continue; } @@ -1023,7 +1133,7 @@ gst_rtspsrc_open (GstRTSPSrc * src) setup_url = g_strdup_printf ("%s/%s", src->location, control_url); } - GST_DEBUG ("setup %s", setup_url); + GST_DEBUG_OBJECT (src, "setup %s", setup_url); /* create SETUP request */ if ((res = rtsp_message_init_request (RTSP_SETUP, setup_url, @@ -1043,6 +1153,8 @@ gst_rtspsrc_open (GstRTSPSrc * src) if (!gst_rtspsrc_stream_setup_rtp (stream, media, &rtpport, &rtcpport)) goto setup_rtp_failed; + GST_DEBUG_OBJECT (src, "setting up RTP ports %d-%d", rtpport, rtcpport); + trxparams = g_strdup_printf ("client_port=%d-%d", rtpport, rtcpport); new = g_strconcat (transports, "RTP/AVP/UDP;unicast;", trxparams, NULL); g_free (trxparams); @@ -1052,6 +1164,8 @@ gst_rtspsrc_open (GstRTSPSrc * src) if (protocols & GST_RTSP_PROTO_UDP_MULTICAST) { gchar *new; + GST_DEBUG_OBJECT (src, "setting up MULTICAST"); + new = g_strconcat (transports, transports[0] ? "," : "", "RTP/AVP/UDP;multicast", NULL); @@ -1061,6 +1175,8 @@ gst_rtspsrc_open (GstRTSPSrc * src) if (protocols & GST_RTSP_PROTO_TCP) { gchar *new; + GST_DEBUG_OBJECT (src, "setting up TCP"); + new = g_strconcat (transports, transports[0] ? "," : "", "RTP/AVP/TCP", NULL); @@ -1088,20 +1204,24 @@ gst_rtspsrc_open (GstRTSPSrc * src) rtsp_transport_parse (resptrans, &transport); /* update allowed transports for other streams */ if (transport.lower_transport == RTSP_LOWER_TRANS_TCP) { + GST_DEBUG_OBJECT (src, "stream %d as TCP", i); protocols = GST_RTSP_PROTO_TCP; src->interleaved = TRUE; } else { if (transport.multicast) { /* disable unicast */ + GST_DEBUG_OBJECT (src, "stream %d as MULTICAST", i); protocols = GST_RTSP_PROTO_UDP_MULTICAST; } else { /* disable multicast */ + GST_DEBUG_OBJECT (src, "stream %d as UNICAST", i); protocols = GST_RTSP_PROTO_UDP_UNICAST; } } /* now configure the stream with the transport */ - if (!gst_rtspsrc_stream_configure_transport (stream, &transport)) { - GST_DEBUG ("could not configure stream transport, skipping stream"); + if (!gst_rtspsrc_stream_configure_transport (stream, media, &transport)) { + GST_DEBUG_OBJECT (src, + "could not configure stream transport, skipping stream"); } /* clean up our transport struct */ rtsp_transport_init (&transport); @@ -1173,11 +1293,20 @@ gst_rtspsrc_close (GstRTSPSrc * src) RTSPMessage response = { 0 }; RTSPResult res; - GST_DEBUG ("TEARDOWN..."); + GST_DEBUG_OBJECT (src, "TEARDOWN..."); /* stop task if any */ if (src->task) { gst_task_stop (src->task); + + /* make sure it is not running */ + g_static_rec_mutex_lock (src->stream_rec_lock); + g_static_rec_mutex_unlock (src->stream_rec_lock); + + /* no wait for the task to finish */ + gst_task_join (src->task); + + /* and free the task */ gst_object_unref (GST_OBJECT (src->task)); src->task = NULL; } @@ -1194,7 +1323,7 @@ gst_rtspsrc_close (GstRTSPSrc * src) } /* close connection */ - GST_DEBUG ("closing connection..."); + GST_DEBUG_OBJECT (src, "closing connection..."); if ((res = rtsp_connection_close (src->connection)) < 0) goto close_failed; @@ -1229,7 +1358,7 @@ gst_rtspsrc_play (GstRTSPSrc * src) if (!(src->options & RTSP_PLAY)) return TRUE; - GST_DEBUG ("PLAY..."); + GST_DEBUG_OBJECT (src, "PLAY..."); /* do play */ if ((res = @@ -1242,6 +1371,7 @@ gst_rtspsrc_play (GstRTSPSrc * src) if (src->interleaved) { src->task = gst_task_create ((GstTaskFunction) gst_rtspsrc_loop, src); + gst_task_set_lock (src->task, src->stream_rec_lock); gst_task_start (src->task); } @@ -1271,7 +1401,7 @@ gst_rtspsrc_pause (GstRTSPSrc * src) if (!(src->options & RTSP_PAUSE)) return TRUE; - GST_DEBUG ("PAUSE..."); + GST_DEBUG_OBJECT (src, "PAUSE..."); /* do pause */ if ((res = rtsp_message_init_request (RTSP_PAUSE, src->location, &request)) < 0) diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index fa19677d..424c512a 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -55,19 +55,23 @@ typedef struct _GstRTSPStream GstRTSPStream; struct _GstRTSPStream { gint id; + GstRTSPSrc *parent; + + GstFlowReturn last_ret; + + /* for interleaved mode */ gint rtpchannel; gint rtcpchannel; + GstCaps *caps; - GstRTSPSrc *parent; - - /* our udp sources */ + /* our udp sources for RTP */ GstElement *rtpsrc; GstElement *rtcpsrc; /* our udp sink back to the server */ GstElement *rtcpsink; - /* the rtp decoder */ + /* the RTP decoder */ GstElement *rtpdec; GstPad *rtpdecrtp; GstPad *rtpdecrtcp; @@ -76,23 +80,25 @@ struct _GstRTSPStream { struct _GstRTSPSrc { GstElement element; - gboolean interleaved; - GstTask *task; + /* task and mutex for interleaved mode */ + gboolean interleaved; + GstTask *task; + GStaticRecMutex *stream_rec_lock; - gint numstreams; - GList *streams; + gint numstreams; + GList *streams; - gchar *location; - gboolean debug; - guint retry; + gchar *location; + gboolean debug; + guint retry; - GstRTSPProto protocols; + GstRTSPProto protocols; /* supported options */ - gint options; + gint options; - RTSPConnection *connection; - RTSPMessage *request; - RTSPMessage *response; + RTSPConnection *connection; + RTSPMessage *request; + RTSPMessage *response; }; struct _GstRTSPSrcClass { |