summaryrefslogtreecommitdiffstats
path: root/gst/rtsp/gstrtspsrc.c
diff options
context:
space:
mode:
Diffstat (limited to 'gst/rtsp/gstrtspsrc.c')
-rw-r--r--gst/rtsp/gstrtspsrc.c281
1 files changed, 217 insertions, 64 deletions
diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c
index ee383a45..a99a102a 100644
--- a/gst/rtsp/gstrtspsrc.c
+++ b/gst/rtsp/gstrtspsrc.c
@@ -290,8 +290,6 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class)
src->stream_rec_lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (src->stream_rec_lock);
- src->loop_cond = g_cond_new ();
-
src->location = g_strdup (DEFAULT_LOCATION);
src->url = NULL;
@@ -316,7 +314,6 @@ gst_rtspsrc_finalize (GObject * object)
g_static_rec_mutex_free (rtspsrc->stream_rec_lock);
g_free (rtspsrc->stream_rec_lock);
- g_cond_free (rtspsrc->loop_cond);
g_free (rtspsrc->location);
g_free (rtspsrc->req_location);
g_free (rtspsrc->content_base);
@@ -1280,8 +1277,8 @@ use_no_manager:
goto no_element;
/* take ownership */
- gst_object_ref (stream->udpsrc[0]);
- gst_object_sink (stream->udpsrc[0]);
+ gst_object_ref (stream->udpsrc[1]);
+ gst_object_sink (stream->udpsrc[1]);
gst_element_set_state (stream->udpsrc[1], GST_STATE_READY);
}
@@ -1350,9 +1347,11 @@ use_no_manager:
else
port = transport->server_port.max;
- destination = transport->destination;
+ /* first take the source, then the endpoint to figure out where to send
+ * the RTCP. */
+ destination = transport->source;
if (destination == NULL)
- destination = src->addr;
+ destination = src->connection->ip;
GST_DEBUG_OBJECT (src, "configure UDP sink for %s:%d", destination, port);
@@ -1375,7 +1374,7 @@ use_no_manager:
stream->rtcppad = gst_element_get_pad (stream->udpsink, "sink");
/* get session RTCP pad */
- name = g_strdup_printf ("rtcp_src_%d", stream->id);
+ name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
pad = gst_element_get_request_pad (src->session, name);
g_free (name);
@@ -1557,10 +1556,43 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event)
gst_event_unref (event);
}
+/* FIXME, handle server request, reply with OK, for now */
+static RTSPResult
+gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request)
+{
+ RTSPMessage response = { 0 };
+ RTSPResult res;
+
+ GST_DEBUG_OBJECT (src, "got server request message");
+
+ if (src->debug)
+ rtsp_message_dump (request);
+
+ res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request);
+ if (res < 0)
+ goto send_error;
+
+ GST_DEBUG_OBJECT (src, "replying with OK");
+
+ if (src->debug)
+ rtsp_message_dump (&response);
+
+ if ((res = rtsp_connection_send (src->connection, &response, NULL)) < 0)
+ goto send_error;
+
+ return RTSP_OK;
+
+ /* ERRORS */
+send_error:
+ {
+ return res;
+ }
+}
+
static void
gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
{
- RTSPMessage response = { 0 };
+ RTSPMessage message = { 0 };
RTSPResult res;
gint channel;
GList *lstream;
@@ -1570,18 +1602,38 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
guint size;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
- gboolean is_rtcp = FALSE;
+ gboolean is_rtcp, have_data;
+ have_data = FALSE;
do {
GST_DEBUG_OBJECT (src, "doing receive");
- if ((res = rtsp_connection_receive (src->connection, &response)) < 0)
+
+ if ((res = rtsp_connection_receive (src->connection, &message, NULL)) < 0)
goto receive_error;
- GST_DEBUG_OBJECT (src, "got packet type %d", response.type);
+ switch (message.type) {
+ case RTSP_MESSAGE_REQUEST:
+ /* server sends us a request message, handle it */
+ if ((res = gst_rtspsrc_handle_request (src, &message)) < 0)
+ goto handle_request_failed;
+ break;
+ case RTSP_MESSAGE_RESPONSE:
+ /* we ignore response messages */
+ GST_DEBUG_OBJECT (src, "ignoring response message");
+ break;
+ case RTSP_MESSAGE_DATA:
+ GST_DEBUG_OBJECT (src, "got data message");
+ have_data = TRUE;
+ break;
+ default:
+ GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
+ message.type);
+ break;
+ }
}
- while (response.type != RTSP_MESSAGE_DATA);
+ while (!have_data);
- channel = response.type_data.data.channel;
+ channel = message.type_data.data.channel;
lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (channel),
(GCompareFunc) find_stream_by_channel);
@@ -1591,13 +1643,16 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
stream = (GstRTSPStream *) lstream->data;
if (channel == stream->channel[0]) {
outpad = stream->channelpad[0];
+ is_rtcp = FALSE;
} else if (channel == stream->channel[1]) {
outpad = stream->channelpad[1];
is_rtcp = TRUE;
+ } else {
+ is_rtcp = FALSE;
}
/* take a look at the body to figure out what we have */
- rtsp_message_get_body (&response, &data, &size);
+ rtsp_message_get_body (&message, &data, &size);
if (size < 2)
goto invalid_length;
@@ -1612,8 +1667,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
if (outpad == NULL)
goto unknown_stream;
- /* and chain buffer to internal element */
- rtsp_message_steal_body (&response, &data, &size);
+ /* take the message body for further processing */
+ rtsp_message_steal_body (&message, &data, &size);
/* strip the trailing \0 */
size -= 1;
@@ -1624,7 +1679,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
GST_BUFFER_SIZE (buf) = size;
/* don't need message anymore */
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size,
channel);
@@ -1652,7 +1707,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
unknown_stream:
{
GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel);
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
return;
}
receive_error:
@@ -1664,9 +1719,20 @@ receive_error:
g_free (str);
if (src->debug)
- rtsp_message_dump (&response);
+ rtsp_message_dump (&message);
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
+ ret = GST_FLOW_UNEXPECTED;
+ goto need_pause;
+ }
+handle_request_failed:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
+ ("Could not send message. (%s)", str));
+ g_free (str);
+ rtsp_message_unset (&message);
ret = GST_FLOW_UNEXPECTED;
goto need_pause;
}
@@ -1674,7 +1740,7 @@ invalid_length:
{
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("Short message received."));
- rtsp_message_unset (&response);
+ rtsp_message_unset (&message);
return;
}
need_pause:
@@ -1707,21 +1773,118 @@ need_pause:
}
}
+/* send server keep-alive */
+static RTSPResult
+gst_rtspsrc_send_keep_alive (GstRTSPSrc * src)
+{
+ RTSPMessage request = { 0 };
+ RTSPMessage response = { 0 };
+ RTSPResult res;
+
+ GST_DEBUG_OBJECT (src, "creating server keep-alive");
+
+ res =
+ rtsp_message_init_request (&request, RTSP_GET_PARAMETER,
+ src->req_location);
+ if (res < 0)
+ goto send_error;
+
+ if (!gst_rtspsrc_send (src, &request, &response, NULL))
+ goto send_error;
+
+ rtsp_message_unset (&request);
+
+ return RTSP_OK;
+
+ /* ERRORS */
+send_error:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ rtsp_message_unset (&request);
+ GST_ELEMENT_WARNING (src, RESOURCE, WRITE, (NULL),
+ ("Could not send keep-alive. (%s)", str));
+ g_free (str);
+ return res;
+ }
+}
+
static void
gst_rtspsrc_loop_udp (GstRTSPSrc * src)
{
gboolean restart = FALSE;
+ RTSPResult res;
GST_OBJECT_LOCK (src);
if (src->loop_cmd == CMD_STOP)
goto stopping;
- /* FIXME, we should continue reading the TCP socket because the server might
- * send us requests */
while (src->loop_cmd == CMD_WAIT) {
- GST_DEBUG_OBJECT (src, "waiting");
- GST_RTSP_LOOP_WAIT (src);
- GST_DEBUG_OBJECT (src, "waiting done");
+ GTimeVal tv_timeout;
+ gint timeout;
+
+ GST_OBJECT_UNLOCK (src);
+
+ while (TRUE) {
+ RTSPMessage message = { 0 };
+
+ /* calculate the session timeout. We should send the keep-alive request a
+ * little earlier to compensate for the round trip time to the server. We
+ * subtract 1 second here. */
+ timeout = src->connection->timeout;
+ if (timeout > 1)
+ timeout -= 1;
+
+ /* use the session timeout for receiving data */
+ tv_timeout.tv_sec = timeout;
+ tv_timeout.tv_usec = 0;
+
+ GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds", timeout);
+
+ /* we should continue reading the TCP socket because the server might
+ * send us requests. When the session timeout expires, we need to send a
+ * keep-alive request to keep the session open. */
+ res = rtsp_connection_receive (src->connection, &message, &tv_timeout);
+
+ switch (res) {
+ case RTSP_OK:
+ GST_DEBUG_OBJECT (src, "we received a server message");
+ break;
+ case RTSP_EINTR:
+ /* we got interrupted, see what we have to do */
+ GST_DEBUG_OBJECT (src, "we got interrupted");
+ /* unset flushing so we can do something else */
+ rtsp_connection_flush (src->connection, FALSE);
+ goto interrupt;
+ case RTSP_ETIMEOUT:
+ /* ignore result, a warning was posted */
+ GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
+ res = gst_rtspsrc_send_keep_alive (src);
+ continue;
+ default:
+ goto receive_error;
+ }
+
+ switch (message.type) {
+ case RTSP_MESSAGE_REQUEST:
+ /* server sends us a request message, handle it */
+ if ((res = gst_rtspsrc_handle_request (src, &message)) < 0)
+ goto handle_request_failed;
+ break;
+ case RTSP_MESSAGE_RESPONSE:
+ case RTSP_MESSAGE_DATA:
+ /* we ignore response and data messages */
+ GST_DEBUG_OBJECT (src, "ignoring message");
+ break;
+ default:
+ GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
+ message.type);
+ break;
+ }
+ }
+ interrupt:
+ GST_OBJECT_LOCK (src);
+ GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd);
if (src->loop_cmd == CMD_STOP)
goto stopping;
}
@@ -1762,7 +1925,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
gst_rtspsrc_close (src);
/* see if we have TCP left to try */
- if (!(src->cur_protocols & RTSP_LOWER_TRANS_TCP))
+ if (!(src->protocols & RTSP_LOWER_TRANS_TCP))
goto no_protocols;
/* open new connection using tcp */
@@ -1790,6 +1953,24 @@ stopping:
gst_task_pause (src->task);
return;
}
+receive_error:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
+ ("Could not receive message. (%s)", str));
+ g_free (str);
+ return;
+ }
+handle_request_failed:
+ {
+ gchar *str = rtsp_strresult (res);
+
+ GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
+ ("Could not handle server message. (%s)", str));
+ g_free (str);
+ return;
+ }
no_protocols:
{
src->cur_protocols = 0;
@@ -1815,7 +1996,8 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd)
{
GST_OBJECT_LOCK (src);
src->loop_cmd = cmd;
- GST_RTSP_LOOP_SIGNAL (src);
+ if (cmd != CMD_WAIT)
+ rtsp_connection_flush (src->connection, TRUE);
GST_OBJECT_UNLOCK (src);
}
@@ -1828,36 +2010,6 @@ gst_rtspsrc_loop (GstRTSPSrc * src)
gst_rtspsrc_loop_udp (src);
}
-static RTSPResult
-gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request)
-{
- RTSPMessage response = { 0 };
- RTSPResult res;
-
- res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request);
- if (res < 0)
- goto send_error;
-
- if (src->debug)
- rtsp_message_dump (&response);
-
- if ((res = rtsp_connection_send (src->connection, &response)) < 0)
- goto send_error;
-
- return RTSP_OK;
-
- /* ERRORS */
-send_error:
- {
- gchar *str = rtsp_strresult (res);
-
- GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
- ("Could not send message. (%s)", str));
- g_free (str);
- return res;
- }
-}
-
#ifndef GST_DISABLE_GST_DEBUG
const gchar *
rtsp_auth_method_to_string (RTSPAuthMethod method)
@@ -2085,11 +2237,11 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request,
if (src->debug)
rtsp_message_dump (request);
- if ((res = rtsp_connection_send (src->connection, request)) < 0)
+ if ((res = rtsp_connection_send (src->connection, request, NULL)) < 0)
goto send_error;
next:
- if ((res = rtsp_connection_receive (src->connection, response)) < 0)
+ if ((res = rtsp_connection_receive (src->connection, response, NULL)) < 0)
goto receive_error;
if (src->debug)
@@ -2097,16 +2249,17 @@ next:
switch (response->type) {
case RTSP_MESSAGE_REQUEST:
- /* FIXME, handle server request, reply with OK, for now */
if ((res = gst_rtspsrc_handle_request (src, response)) < 0)
goto handle_request_failed;
goto next;
case RTSP_MESSAGE_RESPONSE:
/* ok, a response is good */
+ GST_DEBUG_OBJECT (src, "received response message");
break;
default:
case RTSP_MESSAGE_DATA:
/* get next response */
+ GST_DEBUG_OBJECT (src, "ignoring data response message");
goto next;
}
@@ -2587,7 +2740,7 @@ gst_rtspsrc_open (GstRTSPSrc * src)
/* connect */
GST_DEBUG_OBJECT (src, "connecting (%s)...", src->req_location);
- if ((res = rtsp_connection_connect (src->connection)) < 0)
+ if ((res = rtsp_connection_connect (src->connection, NULL)) < 0)
goto could_not_connect;
/* create OPTIONS */
@@ -3116,8 +3269,8 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
gst_message_unref (message);
break;
- /* fatal our not our message, forward */
forward:
+ /* fatal but not our message, forward */
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}