diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2007-05-02 19:32:58 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2007-05-02 19:32:58 +0000 |
commit | 24e51b3c737dd8b01c02e48948afb1e302a195e7 (patch) | |
tree | be2b6325896647f0939ea31987ab49c29dcad6ad | |
parent | 13ae0cde514f3531c5c39722ca05f96377a4d3d5 (diff) |
gst/rtsp/gstrtspsrc.*: Fix race when multiple udp sources post timeouts, just act on the first received timeout.
Original commit message from CVS:
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_init),
(gst_rtspsrc_finalize), (new_session_pad), (request_pt_map),
(gst_rtspsrc_loop_send_cmd), (gst_rtspsrc_try_send),
(gst_rtspsrc_send), (gst_rtspsrc_async_open), (gst_rtspsrc_close),
(gst_rtspsrc_play), (gst_rtspsrc_handle_message),
(gst_rtspsrc_change_state):
* gst/rtsp/gstrtspsrc.h:
Fix race when multiple udp sources post timeouts, just act on the first
received timeout.
Protect stream list with a recursive lock to fix some races.
Flush connection when we need to do a reconnect or stop.
Make state lock recursive.
* gst/rtsp/rtspconnection.c: (rtsp_connection_connect),
(rtsp_connection_close):
Some small cleanups.
-rw-r--r-- | ChangeLog | 19 | ||||
-rw-r--r-- | gst/rtsp/gstrtspsrc.c | 242 | ||||
-rw-r--r-- | gst/rtsp/gstrtspsrc.h | 17 | ||||
-rw-r--r-- | gst/rtsp/rtspconnection.c | 6 |
4 files changed, 182 insertions, 102 deletions
@@ -1,5 +1,24 @@ 2007-05-02 Wim Taymans <wim@fluendo.com> + * gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_init), + (gst_rtspsrc_finalize), (new_session_pad), (request_pt_map), + (gst_rtspsrc_loop_send_cmd), (gst_rtspsrc_try_send), + (gst_rtspsrc_send), (gst_rtspsrc_async_open), (gst_rtspsrc_close), + (gst_rtspsrc_play), (gst_rtspsrc_handle_message), + (gst_rtspsrc_change_state): + * gst/rtsp/gstrtspsrc.h: + Fix race when multiple udp sources post timeouts, just act on the first + received timeout. + Protect stream list with a recursive lock to fix some races. + Flush connection when we need to do a reconnect or stop. + Make state lock recursive. + + * gst/rtsp/rtspconnection.c: (rtsp_connection_connect), + (rtsp_connection_close): + Some small cleanups. + +2007-05-02 Wim Taymans <wim@fluendo.com> + * gst/wavparse/gstwavparse.c: (gst_wavparse_perform_seek), (gst_wavparse_stream_headers), (gst_wavparse_stream_data): Only set DISCONT when there actually is a discont or when we just diff --git a/gst/rtsp/gstrtspsrc.c b/gst/rtsp/gstrtspsrc.c index a99a102a..65cb3c9b 100644 --- a/gst/rtsp/gstrtspsrc.c +++ b/gst/rtsp/gstrtspsrc.c @@ -179,12 +179,6 @@ static GstStateChangeReturn gst_rtspsrc_change_state (GstElement * element, GstStateChange transition); static void gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message); -static gboolean gst_rtspsrc_setup_auth (GstRTSPSrc * src, - RTSPMessage * response); -static gboolean gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request, - RTSPMessage * response, RTSPStatusCode * code); - - static gboolean gst_rtspsrc_open (GstRTSPSrc * src); static gboolean gst_rtspsrc_play (GstRTSPSrc * src); static gboolean gst_rtspsrc_pause (GstRTSPSrc * src); @@ -301,7 +295,8 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class) #endif src->extension->src = (gpointer) src; - src->state_lock = g_mutex_new (); + src->state_rec_lock = g_new (GStaticRecMutex, 1); + g_static_rec_mutex_init (src->state_rec_lock); src->state = RTSP_STATE_INVALID; } @@ -319,7 +314,8 @@ gst_rtspsrc_finalize (GObject * object) g_free (rtspsrc->content_base); rtsp_url_free (rtspsrc->url); g_free (rtspsrc->addr); - g_mutex_free (rtspsrc->state_lock); + g_static_rec_mutex_free (rtspsrc->state_rec_lock); + g_free (rtspsrc->state_rec_lock); if (rtspsrc->extension) { #ifdef WITH_EXT_REAL @@ -1050,9 +1046,11 @@ new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src) gint id, ssrc, pt; GList *lstream; GstRTSPStream *stream; + gboolean all_added; GST_DEBUG_OBJECT (src, "got new session pad %" GST_PTR_FORMAT, pad); + GST_RTSP_STATE_LOCK (src); /* find stream */ name = gst_object_get_name (GST_OBJECT_CAST (pad)); if (sscanf (name, "recv_rtp_src_%d_%d_%d", &id, &ssrc, &pt) != 3) @@ -1079,23 +1077,30 @@ new_session_pad (GstElement * session, GstPad * pad, GstRTSPSrc * src) gst_element_add_pad (GST_ELEMENT_CAST (src), stream->srcpad); /* check if we added all streams */ + all_added = TRUE; for (lstream = src->streams; lstream; lstream = g_list_next (lstream)) { stream = (GstRTSPStream *) lstream->data; - if (!stream->added) - goto done; + if (!stream->added) { + all_added = FALSE; + break; + } + } + GST_RTSP_STATE_UNLOCK (src); + + if (all_added) { + GST_DEBUG_OBJECT (src, "We added all streams"); + /* when we get here, all stream are added and we can fire the no-more-pads + * signal. */ + gst_element_no_more_pads (GST_ELEMENT_CAST (src)); } - GST_DEBUG_OBJECT (src, "We added all streams"); - /* when we get here, all stream are added and we can fire the no-more-pads - * signal. */ - gst_element_no_more_pads (GST_ELEMENT_CAST (src)); -done: return; /* ERRORS */ unknown_stream: { GST_DEBUG_OBJECT (src, "ignoring unknown stream"); + GST_RTSP_STATE_UNLOCK (src); g_free (name); return; } @@ -1106,21 +1111,26 @@ request_pt_map (GstElement * sess, guint session, guint pt, GstRTSPSrc * src) { GstRTSPStream *stream; GList *lstream; + GstCaps *caps; GST_DEBUG_OBJECT (src, "getting pt map for pt %d in session %d", pt, session); + GST_RTSP_STATE_LOCK (src); lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (session), (GCompareFunc) find_stream_by_id); if (!lstream) goto unknown_stream; stream = (GstRTSPStream *) lstream->data; + caps = stream->caps; + GST_RTSP_STATE_UNLOCK (src); - return stream->caps; + return caps; unknown_stream: { GST_DEBUG_OBJECT (src, "unknown stream %d", session); + GST_RTSP_STATE_UNLOCK (src); return NULL; } } @@ -1852,7 +1862,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src) break; case RTSP_EINTR: /* we got interrupted, see what we have to do */ - GST_DEBUG_OBJECT (src, "we got interrupted"); + GST_DEBUG_OBJECT (src, "we got interrupted, unset flushing"); /* unset flushing so we can do something else */ rtsp_connection_flush (src->connection, FALSE); goto interrupt; @@ -1992,12 +2002,14 @@ play_failed: } static void -gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd) +gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd, gboolean flush) { GST_OBJECT_LOCK (src); src->loop_cmd = cmd; - if (cmd != CMD_WAIT) + if (flush) { + GST_DEBUG_OBJECT (src, "start flush"); rtsp_connection_flush (src->connection, TRUE); + } GST_OBJECT_UNLOCK (src); } @@ -2153,76 +2165,6 @@ no_user_pass: } } -/** - * gst_rtspsrc_send: - * @src: the rtsp source - * @request: must point to a valid request - * @response: must point to an empty #RTSPMessage - * - * send @request and retrieve the response in @response. optionally @code can be - * non-NULL in which case it will contain the status code of the response. - * - * If This function returns TRUE, @response will contain a valid response - * message that should be cleaned with rtsp_message_unset() after usage. - * - * If @code is NULL, this function will return FALSE (with an invalid @response - * message) if the response code was not 200 (OK). - * - * If the attempt results in an authentication failure, then this will attempt - * to retrieve authentication credentials via gst_rtspsrc_setup_auth and retry - * the request. - * - * Returns: TRUE if the processing was successful. - */ -gboolean -gst_rtspsrc_send (GstRTSPSrc * src, RTSPMessage * request, - RTSPMessage * response, RTSPStatusCode * code) -{ - RTSPStatusCode int_code = RTSP_STS_OK; - gboolean res; - gboolean retry; - - do { - retry = FALSE; - res = gst_rtspsrc_try_send (src, request, response, &int_code); - - if (int_code == RTSP_STS_UNAUTHORIZED) { - if (gst_rtspsrc_setup_auth (src, response)) { - /* Try the request/response again after configuring the auth info - * and loop again */ - retry = TRUE; - } - } - } while (retry == TRUE); - - /* If the user requested the code, let them handle errors, otherwise - * post an error below */ - if (code != NULL) - *code = int_code; - else if (int_code != RTSP_STS_OK) - goto error_response; - - return res; - -error_response: - { - switch (response->type_data.response.code) { - case RTSP_STS_NOT_FOUND: - GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL), ("%s", - response->type_data.response.reason)); - break; - default: - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Got error response: %d (%s).", response->type_data.response.code, - response->type_data.response.reason)); - break; - } - /* we return FALSE so we should unset the response ourselves */ - rtsp_message_unset (response); - return FALSE; - } -} - static gboolean gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request, RTSPMessage * response, RTSPStatusCode * code) @@ -2234,6 +2176,8 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request, if (src->extension && src->extension->before_send) src->extension->before_send (src->extension, request); + GST_DEBUG_OBJECT (src, "sending message"); + if (src->debug) rtsp_message_dump (request); @@ -2309,6 +2253,76 @@ handle_request_failed: } } +/** + * gst_rtspsrc_send: + * @src: the rtsp source + * @request: must point to a valid request + * @response: must point to an empty #RTSPMessage + * + * send @request and retrieve the response in @response. optionally @code can be + * non-NULL in which case it will contain the status code of the response. + * + * If This function returns TRUE, @response will contain a valid response + * message that should be cleaned with rtsp_message_unset() after usage. + * + * If @code is NULL, this function will return FALSE (with an invalid @response + * message) if the response code was not 200 (OK). + * + * If the attempt results in an authentication failure, then this will attempt + * to retrieve authentication credentials via gst_rtspsrc_setup_auth and retry + * the request. + * + * Returns: TRUE if the processing was successful. + */ +gboolean +gst_rtspsrc_send (GstRTSPSrc * src, RTSPMessage * request, + RTSPMessage * response, RTSPStatusCode * code) +{ + RTSPStatusCode int_code = RTSP_STS_OK; + gboolean res; + gboolean retry; + + do { + retry = FALSE; + res = gst_rtspsrc_try_send (src, request, response, &int_code); + + if (int_code == RTSP_STS_UNAUTHORIZED) { + if (gst_rtspsrc_setup_auth (src, response)) { + /* Try the request/response again after configuring the auth info + * and loop again */ + retry = TRUE; + } + } + } while (retry == TRUE); + + /* If the user requested the code, let them handle errors, otherwise + * post an error below */ + if (code != NULL) + *code = int_code; + else if (int_code != RTSP_STS_OK) + goto error_response; + + return res; + +error_response: + { + switch (response->type_data.response.code) { + case RTSP_STS_NOT_FOUND: + GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL), ("%s", + response->type_data.response.reason)); + break; + default: + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("Got error response: %d (%s).", response->type_data.response.code, + response->type_data.response.reason)); + break; + } + /* we return FALSE so we should unset the response ourselves */ + rtsp_message_unset (response); + return FALSE; + } +} + /* parse the response and collect all the supported methods. We need this * information so that we don't try to send an unsupported request to the * server. @@ -2896,6 +2910,23 @@ cleanup_error: } } +#if 0 +static gboolean +gst_rtspsrc_async_open (GstRTSPSrc * src) +{ + GError *error = NULL; + gboolean res = TRUE; + + src->thread = + g_thread_create ((GThreadFunc) gst_rtspsrc_open, src, TRUE, &error); + if (error != NULL) { + GST_ELEMENT_ERROR (src, RESOURCE, INIT, (NULL), + ("Could not start async thread (%s).", error->message)); + } + return res; +} +#endif + static gboolean gst_rtspsrc_close (GstRTSPSrc * src) { @@ -2907,15 +2938,15 @@ gst_rtspsrc_close (GstRTSPSrc * src) GST_RTSP_STATE_LOCK (src); - gst_rtspsrc_loop_send_cmd (src, CMD_STOP); + gst_rtspsrc_loop_send_cmd (src, CMD_STOP, TRUE); /* stop task if any */ if (src->task) { gst_task_stop (src->task); /* make sure it is not running */ - g_static_rec_mutex_lock (src->stream_rec_lock); - g_static_rec_mutex_unlock (src->stream_rec_lock); + GST_RTSP_STREAM_LOCK (src); + GST_RTSP_STREAM_UNLOCK (src); /* no wait for the task to finish */ gst_task_join (src->task); @@ -2925,6 +2956,9 @@ gst_rtspsrc_close (GstRTSPSrc * src) src->task = NULL; } + GST_DEBUG_OBJECT (src, "stop flush"); + rtsp_connection_flush (src->connection, FALSE); + if (src->methods & RTSP_PLAY) { /* do TEARDOWN */ res = @@ -3096,7 +3130,6 @@ gst_rtspsrc_play (GstRTSPSrc * src) * Play Time) and should be put in the NEWSEGMENT position field. */ rtsp_message_get_header (&response, RTSP_HDR_RANGE, &range); - /* parse the RTP-Info header field (if ANY) to get the base seqnum and timestamp * for the RTP packets. If this is not present, we assume all starts from 0... * FIXME, this is info for the RTP session manager ideally. */ @@ -3111,11 +3144,11 @@ gst_rtspsrc_play (GstRTSPSrc * src) * For UDP we start the task as well to look for server info and UDP timeouts. */ if (src->task == NULL) { src->task = gst_task_create ((GstTaskFunction) gst_rtspsrc_loop, src); - gst_task_set_lock (src->task, src->stream_rec_lock); + gst_task_set_lock (src->task, GST_RTSP_STREAM_GET_LOCK (src)); } src->running = TRUE; src->state = RTSP_STATE_PLAYING; - gst_rtspsrc_loop_send_cmd (src, CMD_WAIT); + gst_rtspsrc_loop_send_cmd (src, CMD_WAIT, FALSE); gst_task_start (src->task); done: @@ -3227,8 +3260,21 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message) const GstStructure *s = gst_message_get_structure (message); if (gst_structure_has_name (s, "GstUDPSrcTimeout")) { + gboolean ignore_timeout; + GST_DEBUG_OBJECT (bin, "timeout on UDP port"); - gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT); + + GST_OBJECT_LOCK (rtspsrc); + ignore_timeout = rtspsrc->ignore_timeout; + rtspsrc->ignore_timeout = TRUE; + GST_OBJECT_UNLOCK (rtspsrc); + + /* we only act on the first udp timeout message, others are irrelevant + * and can be ignored. */ + if (ignore_timeout) + gst_message_unref (message); + else + gst_rtspsrc_loop_send_cmd (rtspsrc, CMD_RECONNECT, TRUE); return; } GST_BIN_CLASS (parent_class)->handle_message (bin, message); @@ -3300,10 +3346,13 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_READY_TO_PAUSED: rtspsrc->cur_protocols = rtspsrc->protocols; + /* first attempt, don't ignore timeouts */ + rtspsrc->ignore_timeout = FALSE; if (!gst_rtspsrc_open (rtspsrc)) goto open_failed; break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + GST_DEBUG_OBJECT (rtspsrc, "stop flush"); rtsp_connection_flush (rtspsrc->connection, FALSE); /* FIXME, the server might send UDP packets before we activate the UDP * ports */ @@ -3311,6 +3360,7 @@ gst_rtspsrc_change_state (GstElement * element, GstStateChange transition) break; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: case GST_STATE_CHANGE_PAUSED_TO_READY: + GST_DEBUG_OBJECT (rtspsrc, "start flush"); rtsp_connection_flush (rtspsrc->connection, TRUE); break; default: diff --git a/gst/rtsp/gstrtspsrc.h b/gst/rtsp/gstrtspsrc.h index 96569bb4..f2221f7c 100644 --- a/gst/rtsp/gstrtspsrc.h +++ b/gst/rtsp/gstrtspsrc.h @@ -67,9 +67,13 @@ G_BEGIN_DECLS typedef struct _GstRTSPSrc GstRTSPSrc; typedef struct _GstRTSPSrcClass GstRTSPSrcClass; -#define GST_RTSP_STATE_GET_LOCK(rtsp) (GST_RTSPSRC_CAST(rtsp)->state_lock) -#define GST_RTSP_STATE_LOCK(rtsp) (g_mutex_lock (GST_RTSP_STATE_GET_LOCK(rtsp))) -#define GST_RTSP_STATE_UNLOCK(rtsp) (g_mutex_unlock (GST_RTSP_STATE_GET_LOCK(rtsp))) +#define GST_RTSP_STATE_GET_LOCK(rtsp) (GST_RTSPSRC_CAST(rtsp)->state_rec_lock) +#define GST_RTSP_STATE_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_STATE_GET_LOCK(rtsp))) +#define GST_RTSP_STATE_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_STATE_GET_LOCK(rtsp))) + +#define GST_RTSP_STREAM_GET_LOCK(rtsp) (GST_RTSPSRC_CAST(rtsp)->stream_rec_lock) +#define GST_RTSP_STREAM_LOCK(rtsp) (g_static_rec_mutex_lock (GST_RTSP_STREAM_GET_LOCK(rtsp))) +#define GST_RTSP_STREAM_UNLOCK(rtsp) (g_static_rec_mutex_unlock (GST_RTSP_STREAM_GET_LOCK(rtsp))) typedef struct _GstRTSPStream GstRTSPStream; @@ -121,9 +125,12 @@ struct _GstRTSPSrc { gboolean running; gint free_channel; - /* cond to signal loop */ + /* UDP mode loop */ gint loop_cmd; - GMutex *state_lock; + gboolean ignore_timeout; + + /* mutex for protecting state changes */ + GStaticRecMutex *state_rec_lock; gint numstreams; GList *streams; diff --git a/gst/rtsp/rtspconnection.c b/gst/rtsp/rtspconnection.c index e2d375e6..e5ad277b 100644 --- a/gst/rtsp/rtspconnection.c +++ b/gst/rtsp/rtspconnection.c @@ -205,6 +205,9 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout) if (fd == -1) goto sys_error; + /* set to non-blocking mode so that we can cancel the connect */ + //fcntl (fd, F_SETFL, O_NONBLOCK); + ret = connect (fd, (struct sockaddr *) &sin, sizeof (sin)); if (ret != 0) goto sys_error; @@ -216,6 +219,8 @@ rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout) sys_error: { + if (fd != -1) + CLOSE_SOCKET (fd); return RTSP_ESYS; } not_resolved: @@ -828,7 +833,6 @@ rtsp_connection_close (RTSPConnection * conn) gint res; g_return_val_if_fail (conn != NULL, RTSP_EINVAL); - g_return_val_if_fail (conn->fd >= 0, RTSP_EINVAL); if (conn->fd != -1) { res = CLOSE_SOCKET (conn->fd); |