diff options
Diffstat (limited to 'gst/rtsp/gstrtspsrc.c')
-rw-r--r-- | gst/rtsp/gstrtspsrc.c | 281 |
1 files changed, 217 insertions, 64 deletions
diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index ee383a45..a99a102a 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -290,8 +290,6 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class) src->stream_rec_lock = g_new (GStaticRecMutex, 1); g_static_rec_mutex_init (src->stream_rec_lock); - src->loop_cond = g_cond_new (); - src->location = g_strdup (DEFAULT_LOCATION); src->url = NULL; @@ -316,7 +314,6 @@ gst_rtspsrc_finalize (GObject * object) g_static_rec_mutex_free (rtspsrc->stream_rec_lock); g_free (rtspsrc->stream_rec_lock); - g_cond_free (rtspsrc->loop_cond); g_free (rtspsrc->location); g_free (rtspsrc->req_location); g_free (rtspsrc->content_base); @@ -1280,8 +1277,8 @@ use_no_manager: goto no_element; /* take ownership */ - gst_object_ref (stream->udpsrc[0]); - gst_object_sink (stream->udpsrc[0]); + gst_object_ref (stream->udpsrc[1]); + gst_object_sink (stream->udpsrc[1]); gst_element_set_state (stream->udpsrc[1], GST_STATE_READY); } @@ -1350,9 +1347,11 @@ use_no_manager: else port = transport->server_port.max; - destination = transport->destination; + /* first take the source, then the endpoint to figure out where to send + * the RTCP. */ + destination = transport->source; if (destination == NULL) - destination = src->addr; + destination = src->connection->ip; GST_DEBUG_OBJECT (src, "configure UDP sink for %s:%d", destination, port); @@ -1375,7 +1374,7 @@ use_no_manager: stream->rtcppad = gst_element_get_pad (stream->udpsink, "sink"); /* get session RTCP pad */ - name = g_strdup_printf ("rtcp_src_%d", stream->id); + name = g_strdup_printf ("send_rtcp_src_%d", stream->id); pad = gst_element_get_request_pad (src->session, name); g_free (name); @@ -1557,10 +1556,43 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event) gst_event_unref (event); } +/* FIXME, handle server request, reply with OK, for now */ +static RTSPResult +gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request) +{ + RTSPMessage response = { 0 }; + RTSPResult res; + + GST_DEBUG_OBJECT (src, "got server request message"); + + if (src->debug) + rtsp_message_dump (request); + + res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request); + if (res < 0) + goto send_error; + + GST_DEBUG_OBJECT (src, "replying with OK"); + + if (src->debug) + rtsp_message_dump (&response); + + if ((res = rtsp_connection_send (src->connection, &response, NULL)) < 0) + goto send_error; + + return RTSP_OK; + + /* ERRORS */ +send_error: + { + return res; + } +} + static void gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) { - RTSPMessage response = { 0 }; + RTSPMessage message = { 0 }; RTSPResult res; gint channel; GList *lstream; @@ -1570,18 +1602,38 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) guint size; GstFlowReturn ret = GST_FLOW_OK; GstBuffer *buf; - gboolean is_rtcp = FALSE; + gboolean is_rtcp, have_data; + have_data = FALSE; do { GST_DEBUG_OBJECT (src, "doing receive"); - if ((res = rtsp_connection_receive (src->connection, &response)) < 0) + + if ((res = rtsp_connection_receive (src->connection, &message, NULL)) < 0) goto receive_error; - GST_DEBUG_OBJECT (src, "got packet type %d", response.type); + switch (message.type) { + case RTSP_MESSAGE_REQUEST: + /* server sends us a request message, handle it */ + if ((res = gst_rtspsrc_handle_request (src, &message)) < 0) + goto handle_request_failed; + break; + case RTSP_MESSAGE_RESPONSE: + /* we ignore response messages */ + GST_DEBUG_OBJECT (src, "ignoring response message"); + break; + case RTSP_MESSAGE_DATA: + GST_DEBUG_OBJECT (src, "got data message"); + have_data = TRUE; + break; + default: + GST_WARNING_OBJECT (src, "ignoring unknown message type %d", + message.type); + break; + } } - while (response.type != RTSP_MESSAGE_DATA); + while (!have_data); - channel = response.type_data.data.channel; + channel = message.type_data.data.channel; lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (channel), (GCompareFunc) find_stream_by_channel); @@ -1591,13 +1643,16 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) stream = (GstRTSPStream *) lstream->data; if (channel == stream->channel[0]) { outpad = stream->channelpad[0]; + is_rtcp = FALSE; } else if (channel == stream->channel[1]) { outpad = stream->channelpad[1]; is_rtcp = TRUE; + } else { + is_rtcp = FALSE; } /* take a look at the body to figure out what we have */ - rtsp_message_get_body (&response, &data, &size); + rtsp_message_get_body (&message, &data, &size); if (size < 2) goto invalid_length; @@ -1612,8 +1667,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) if (outpad == NULL) goto unknown_stream; - /* and chain buffer to internal element */ - rtsp_message_steal_body (&response, &data, &size); + /* take the message body for further processing */ + rtsp_message_steal_body (&message, &data, &size); /* strip the trailing \0 */ size -= 1; @@ -1624,7 +1679,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) GST_BUFFER_SIZE (buf) = size; /* don't need message anymore */ - rtsp_message_unset (&response); + rtsp_message_unset (&message); GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size, channel); @@ -1652,7 +1707,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src) unknown_stream: { GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel); - rtsp_message_unset (&response); + rtsp_message_unset (&message); return; } receive_error: @@ -1664,9 +1719,20 @@ receive_error: g_free (str); if (src->debug) - rtsp_message_dump (&response); + rtsp_message_dump (&message); - rtsp_message_unset (&response); + rtsp_message_unset (&message); + ret = GST_FLOW_UNEXPECTED; + goto need_pause; + } +handle_request_failed: + { + gchar *str = rtsp_strresult (res); + + GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), + ("Could not send message. (%s)", str)); + g_free (str); + rtsp_message_unset (&message); ret = GST_FLOW_UNEXPECTED; goto need_pause; } @@ -1674,7 +1740,7 @@ invalid_length: { GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), ("Short message received.")); - rtsp_message_unset (&response); + rtsp_message_unset (&message); return; } need_pause: @@ -1707,21 +1773,118 @@ need_pause: } } +/* send server keep-alive */ +static RTSPResult +gst_rtspsrc_send_keep_alive (GstRTSPSrc * src) +{ + RTSPMessage request = { 0 }; + RTSPMessage response = { 0 }; + RTSPResult res; + + GST_DEBUG_OBJECT (src, "creating server keep-alive"); + + res = + rtsp_message_init_request (&request, RTSP_GET_PARAMETER, + src->req_location); + if (res < 0) + goto send_error; + + if (!gst_rtspsrc_send (src, &request, &response, NULL)) + goto send_error; + + rtsp_message_unset (&request); + + return RTSP_OK; + + /* ERRORS */ +send_error: + { + gchar *str = rtsp_strresult (res); + + rtsp_message_unset (&request); + GST_ELEMENT_WARNING (src, RESOURCE, WRITE, (NULL), + ("Could not send keep-alive. (%s)", str)); + g_free (str); + return res; + } +} + static void gst_rtspsrc_loop_udp (GstRTSPSrc * src) { gboolean restart = FALSE; + RTSPResult res; GST_OBJECT_LOCK (src); if (src->loop_cmd == CMD_STOP) goto stopping; - /* FIXME, we should continue reading the TCP socket because the server might - * send us requests */ while (src->loop_cmd == CMD_WAIT) { - GST_DEBUG_OBJECT (src, "waiting"); - GST_RTSP_LOOP_WAIT (src); - GST_DEBUG_OBJECT (src, "waiting done"); + GTimeVal tv_timeout; + gint timeout; + + GST_OBJECT_UNLOCK (src); + + while (TRUE) { + RTSPMessage message = { 0 }; + + /* calculate the session timeout. We should send the keep-alive request a + * little earlier to compensate for the round trip time to the server. We + * subtract 1 second here. */ + timeout = src->connection->timeout; + if (timeout > 1) + timeout -= 1; + + /* use the session timeout for receiving data */ + tv_timeout.tv_sec = timeout; + tv_timeout.tv_usec = 0; + + GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds", timeout); + + /* we should continue reading the TCP socket because the server might + * send us requests. When the session timeout expires, we need to send a + * keep-alive request to keep the session open. */ + res = rtsp_connection_receive (src->connection, &message, &tv_timeout); + + switch (res) { + case RTSP_OK: + GST_DEBUG_OBJECT (src, "we received a server message"); + break; + case RTSP_EINTR: + /* we got interrupted, see what we have to do */ + GST_DEBUG_OBJECT (src, "we got interrupted"); + /* unset flushing so we can do something else */ + rtsp_connection_flush (src->connection, FALSE); + goto interrupt; + case RTSP_ETIMEOUT: + /* ignore result, a warning was posted */ + GST_DEBUG_OBJECT (src, "timout, sending keep-alive"); + res = gst_rtspsrc_send_keep_alive (src); + continue; + default: + goto receive_error; + } + + switch (message.type) { + case RTSP_MESSAGE_REQUEST: + /* server sends us a request message, handle it */ + if ((res = gst_rtspsrc_handle_request (src, &message)) < 0) + goto handle_request_failed; + break; + case RTSP_MESSAGE_RESPONSE: + case RTSP_MESSAGE_DATA: + /* we ignore response and data messages */ + GST_DEBUG_OBJECT (src, "ignoring message"); + break; + default: + GST_WARNING_OBJECT (src, "ignoring unknown message type %d", + message.type); + break; + } + } + interrupt: + GST_OBJECT_LOCK (src); + GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd); if (src->loop_cmd == CMD_STOP) goto stopping; } @@ -1762,7 +1925,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) gst_rtspsrc_close (src); /* see if we have TCP left to try */ - if (!(src->cur_protocols & RTSP_LOWER_TRANS_TCP)) + if (!(src->protocols & RTSP_LOWER_TRANS_TCP)) goto no_protocols; /* open new connection using tcp */ @@ -1790,6 +1953,24 @@ stopping: gst_task_pause (src->task); return; } +receive_error: + { + gchar *str = rtsp_strresult (res); + + GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), + ("Could not receive message. (%s)", str)); + g_free (str); + return; + } +handle_request_failed: + { + gchar *str = rtsp_strresult (res); + + GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL), + ("Could not handle server message. (%s)", str)); + g_free (str); + return; + } no_protocols: { src->cur_protocols = 0; @@ -1815,7 +1996,8 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd) { GST_OBJECT_LOCK (src); src->loop_cmd = cmd; - GST_RTSP_LOOP_SIGNAL (src); + if (cmd != CMD_WAIT) + rtsp_connection_flush (src->connection, TRUE); GST_OBJECT_UNLOCK (src); } @@ -1828,36 +2010,6 @@ gst_rtspsrc_loop (GstRTSPSrc * src) gst_rtspsrc_loop_udp (src); } -static RTSPResult -gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request) -{ - RTSPMessage response = { 0 }; - RTSPResult res; - - res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request); - if (res < 0) - goto send_error; - - if (src->debug) - rtsp_message_dump (&response); - - if ((res = rtsp_connection_send (src->connection, &response)) < 0) - goto send_error; - - return RTSP_OK; - - /* ERRORS */ -send_error: - { - gchar *str = rtsp_strresult (res); - - GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL), - ("Could not send message. (%s)", str)); - g_free (str); - return res; - } -} - #ifndef GST_DISABLE_GST_DEBUG const gchar * rtsp_auth_method_to_string (RTSPAuthMethod method) @@ -2085,11 +2237,11 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request, if (src->debug) rtsp_message_dump (request); - if ((res = rtsp_connection_send (src->connection, request)) < 0) + if ((res = rtsp_connection_send (src->connection, request, NULL)) < 0) goto send_error; next: - if ((res = rtsp_connection_receive (src->connection, response)) < 0) + if ((res = rtsp_connection_receive (src->connection, response, NULL)) < 0) goto receive_error; if (src->debug) @@ -2097,16 +2249,17 @@ next: switch (response->type) { case RTSP_MESSAGE_REQUEST: - /* FIXME, handle server request, reply with OK, for now */ if ((res = gst_rtspsrc_handle_request (src, response)) < 0) goto handle_request_failed; goto next; case RTSP_MESSAGE_RESPONSE: /* ok, a response is good */ + GST_DEBUG_OBJECT (src, "received response message"); break; default: case RTSP_MESSAGE_DATA: /* get next response */ + GST_DEBUG_OBJECT (src, "ignoring data response message"); goto next; } @@ -2587,7 +2740,7 @@ gst_rtspsrc_open (GstRTSPSrc * src) /* connect */ GST_DEBUG_OBJECT (src, "connecting (%s)...", src->req_location); - if ((res = rtsp_connection_connect (src->connection)) < 0) + if ((res = rtsp_connection_connect (src->connection, NULL)) < 0) goto could_not_connect; /* create OPTIONS */ @@ -3116,8 +3269,8 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message) gst_message_unref (message); break; - /* fatal our not our message, forward */ forward: + /* fatal but not our message, forward */ GST_BIN_CLASS (parent_class)->handle_message (bin, message); break; } |