diff options
author | Peter Kjellerstedt <pkj@axis.com> | 2008-02-28 11:51:24 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2008-02-28 11:51:24 +0000 |
commit | 9c814472e56ed0a6c70755cf9c70860a5fb32e08 (patch) | |
tree | 79b2ef003eab9c71c5d7829abc84c40fa36cc59f /gst/udp | |
parent | 593528c5f6bde202cf4f420afd077084ca153dd3 (diff) |
gst/udp/gstudpsrc.*: Port to GstPoll. See #505417.
Original commit message from CVS:
Patch by: Peter Kjellerstedt <pkj at axis com>
* gst/udp/gstudpsrc.c: (gst_udpsrc_init), (gst_udpsrc_create),
(gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_unlock),
(gst_udpsrc_unlock_stop), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Port to GstPoll. See #505417.
Diffstat (limited to 'gst/udp')
-rw-r--r-- | gst/udp/gstudpsrc.c | 180 | ||||
-rw-r--r-- | gst/udp/gstudpsrc.h | 26 |
2 files changed, 72 insertions, 134 deletions
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index 8964f837..a7272501 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -141,29 +141,12 @@ typedef int socklen_t; GST_DEBUG_CATEGORY_STATIC (udpsrc_debug); #define GST_CAT_DEFAULT (udpsrc_debug) -/* the select call is also performed on the control sockets, that way - * we can send special commands to unblock or restart the select call */ -#define CONTROL_RESTART 'R' /* restart the select call */ -#define CONTROL_STOP 'S' /* stop the select call */ -#define CONTROL_SOCKETS(src) src->control_sock -#define WRITE_SOCKET(src) src->control_sock[1] -#define READ_SOCKET(src) src->control_sock[0] - -#define SEND_COMMAND(src, command, res) \ -G_STMT_START { \ - unsigned char c; c = command; \ - res = write (WRITE_SOCKET(src), &c, 1); \ -} G_STMT_END - -#define READ_COMMAND(src, command, res) \ -G_STMT_START { \ - res = read(READ_SOCKET(src), &command, 1); \ -} G_STMT_END - #define CLOSE_IF_REQUESTED(udpctx) \ +G_STMT_START { \ if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) \ - CLOSE_SOCKET(udpctx->sock); \ - udpctx->sock = -1; + CLOSE_SOCKET(udpctx->sock.fd); \ + udpctx->sock.fd = -1; \ +} G_STMT_END static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, @@ -326,9 +309,7 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class) udpsrc->closefd = UDP_DEFAULT_CLOSEFD; udpsrc->externalfd = (udpsrc->sockfd != -1); - udpsrc->sock = UDP_DEFAULT_SOCK; - udpsrc->control_sock[0] = -1; - udpsrc->control_sock[1] = -1; + udpsrc->sock.fd = UDP_DEFAULT_SOCK; gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME); gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE); } @@ -368,8 +349,6 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) GstNetBuffer *outbuf; struct sockaddr_in tmpaddr; socklen_t len; - fd_set read_fds; - guint max_sock; guint8 *pktdata; gint pktsize; @@ -378,6 +357,7 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) #elif defined G_OS_WIN32 gulong readsize; #endif + GstClockTime timeout; gint ret; gboolean try_again; @@ -386,49 +366,29 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) retry: /* quick check, avoid going in select when we already have data */ readsize = 0; - if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0) + if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0) goto ioctl_failed; if (readsize > 0) goto no_select; - do { - gboolean stop; - struct timeval timeval, *timeout; - - FD_ZERO (&read_fds); - FD_SET (udpsrc->sock, &read_fds); -#ifndef G_OS_WIN32 - FD_SET (READ_SOCKET (udpsrc), &read_fds); -#endif - max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc)); + if (udpsrc->timeout > 0) { + timeout = udpsrc->timeout * GST_USECOND; + } else { + timeout = GST_CLOCK_TIME_NONE; + } + do { try_again = FALSE; - stop = FALSE; GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT, udpsrc->timeout); - if (udpsrc->timeout > 0) { - timeval.tv_sec = udpsrc->timeout / 1000000; - timeval.tv_usec = udpsrc->timeout % 1000000; - timeout = &timeval; - } else { - timeout = NULL; - } - -#ifdef G_OS_WIN32 - if (((max_sock + 1) != READ_SOCKET (udpsrc)) || - ((max_sock + 1) != WRITE_SOCKET (udpsrc))) { - ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout); - } else { - ret = 1; - } -#else - ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout); -#endif + ret = gst_poll_wait (udpsrc->fdset, timeout); GST_LOG_OBJECT (udpsrc, "select returned %d", ret); if (ret < 0) { + if (errno == EBUSY) + goto stopped; #ifdef G_OS_WIN32 if (WSAGetLastError () != WSAEINTR) goto select_error; @@ -444,9 +404,6 @@ retry: gst_structure_new ("GstUDPSrcTimeout", "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL))); try_again = TRUE; - } else { - if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds)) - goto stopped; } } while (try_again); @@ -454,7 +411,7 @@ retry: * one UDP packet. We will check the return value, though, because in some * case it can return 0 and we don't want a 0 sized buffer. */ readsize = 0; - if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0) + if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0) goto ioctl_failed; /* if we get here and there is nothing to read from the socket, the select got @@ -472,7 +429,7 @@ no_select: while (TRUE) { len = sizeof (struct sockaddr); - ret = recvfrom (udpsrc->sock, pktdata, pktsize, + ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, (struct sockaddr *) &tmpaddr, &len); if (ret < 0) { if (errno != EAGAIN && errno != EINTR) @@ -690,7 +647,7 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, g_value_set_boolean (value, udpsrc->closefd); break; case PROP_SOCK: - g_value_set_int (value, udpsrc->sock); + g_value_set_int (value, udpsrc->sock.fd); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -713,22 +670,6 @@ gst_udpsrc_start (GstBaseSrc * bsrc) src = GST_UDPSRC (bsrc); -#ifdef G_OS_WIN32 - GST_DEBUG_OBJECT (src, "creating pipe"); - - /* This should work on UNIX too. PF_UNIX sockets replaced with pipe */ - /* pipe( CONTROL_SOCKETS(src), 4096, _O_BINARY ) */ - if ((ret = _pipe (CONTROL_SOCKETS (src), 4096, _O_BINARY)) < 0) - goto no_socket_pair; -#else - GST_DEBUG_OBJECT (src, "creating socket pair"); - if ((ret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src))) < 0) - goto no_socket_pair; - - fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); -#endif - if (!inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) src->multi_addr.imr_multiaddr.s_addr = 0; @@ -737,12 +678,12 @@ gst_udpsrc_start (GstBaseSrc * bsrc) if ((ret = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) goto no_socket; - src->sock = ret; + src->sock.fd = ret; src->externalfd = FALSE; reuse = 1; if ((ret = - setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse, + setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse))) < 0) goto setsockopt_error; @@ -756,25 +697,26 @@ gst_udpsrc_start (GstBaseSrc * bsrc) src->myaddr.sin_addr.s_addr = INADDR_ANY; GST_DEBUG_OBJECT (src, "binding on port %d", src->port); - if ((ret = bind (src->sock, (struct sockaddr *) &src->myaddr, + if ((ret = bind (src->sock.fd, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr))) < 0) goto bind_error; } else { /* we use the configured socket */ - src->sock = src->sockfd; + src->sock.fd = src->sockfd; src->externalfd = TRUE; } if (src->multi_addr.imr_multiaddr.s_addr) { src->multi_addr.imr_interface.s_addr = INADDR_ANY; if ((ret = - setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, + setsockopt (src->sock.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &src->multi_addr, sizeof (src->multi_addr))) < 0) goto membership; } len = sizeof (my_addr); - if ((ret = getsockname (src->sock, (struct sockaddr *) &my_addr, &len)) < 0) + if ((ret = + getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0) goto getsockname_error; len = sizeof (rcvsize); @@ -785,7 +727,9 @@ gst_udpsrc_start (GstBaseSrc * bsrc) /* set buffer size, Note that on Linux this is typically limited to a * maximum of around 100K. Also a minimum of 128 bytes is required on * Linux. */ - ret = setsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, len); + ret = + setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, + len); if (ret != 0) goto udpbuffer_error; } @@ -793,14 +737,15 @@ gst_udpsrc_start (GstBaseSrc * bsrc) /* read the value of the receive buffer. Note that on linux this returns 2x the * value we set because the kernel allocates extra memory for metadata. * The default on Linux is about 100K (which is about 50K without metadata) */ - ret = getsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len); + ret = + getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len); if (ret == 0) GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize); else GST_DEBUG_OBJECT (src, "could not get udp buffer size"); bc_val = 1; - if ((ret = setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, + if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val))) < 0) goto no_broadcast; @@ -814,15 +759,20 @@ gst_udpsrc_start (GstBaseSrc * bsrc) src->myaddr.sin_port = htons (src->port + 1); +#ifdef G_OS_WIN32 + if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, FALSE)) == NULL) + goto no_fdset; +#else + if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL) + goto no_fdset; +#endif + + gst_poll_add_fd (src->fdset, &src->sock); + gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE); + return TRUE; /* ERRORS */ -no_socket_pair: - { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - ("no socket pair %d: %s (%d)", ret, g_strerror (errno), errno)); - return FALSE; - } no_socket: { GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), @@ -873,19 +823,25 @@ no_broadcast: g_strerror (errno), errno)); return FALSE; } +no_fdset: + { + CLOSE_IF_REQUESTED (src); + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno), + errno)); + return FALSE; + } } static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc) { GstUDPSrc *src; - gint res; src = GST_UDPSRC (bsrc); - GST_LOG_OBJECT (src, "sending stop command"); - SEND_COMMAND (src, CONTROL_STOP, res); - GST_LOG_OBJECT (src, "sent stop command %d", res); + GST_LOG_OBJECT (src, "Flushing"); + gst_poll_set_flushing (src->fdset, TRUE); return TRUE; } @@ -897,21 +853,8 @@ gst_udpsrc_unlock_stop (GstBaseSrc * bsrc) src = GST_UDPSRC (bsrc); - GST_LOG_OBJECT (src, "clearing unlock command queue"); - - while (TRUE) { - gchar command; - int res; - - GST_LOG_OBJECT (src, "reading command"); - - READ_COMMAND (src, command, res); - if (res <= 0) { - GST_LOG_OBJECT (src, "no more commands"); - /* no more commands */ - break; - } - } + GST_LOG_OBJECT (src, "No longer flushing"); + gst_poll_set_flushing (src->fdset, FALSE); return TRUE; } @@ -925,18 +868,13 @@ gst_udpsrc_stop (GstBaseSrc * bsrc) GST_DEBUG ("stopping, closing sockets"); - if (src->sock != -1) { + if (src->sock.fd >= 0) { CLOSE_IF_REQUESTED (src); } - /* pipes on WIN32 else sockets */ - if (src->control_sock[0] != -1) { - close (src->control_sock[0]); - src->control_sock[0] = -1; - } - if (src->control_sock[1] != -1) { - close (src->control_sock[1]); - src->control_sock[1] = -1; + if (src->fdset) { + gst_poll_free (src->fdset); + src->fdset = NULL; } WSA_CLEANUP (src); diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h index c672d2a2..e736a592 100644 --- a/gst/udp/gstudpsrc.h +++ b/gst/udp/gstudpsrc.h @@ -52,21 +52,21 @@ struct _GstUDPSrc { GstPushSrc parent; /* properties */ - gchar *uri; - int port; - gchar *multi_group; - gint ttl; - GstCaps *caps; - gint buffer_size; - guint64 timeout; - gint skip_first_bytes; - int sockfd; - gboolean closefd; + gchar *uri; + int port; + gchar *multi_group; + gint ttl; + GstCaps *caps; + gint buffer_size; + guint64 timeout; + gint skip_first_bytes; + int sockfd; + gboolean closefd; /* our sockets */ - int sock; - int control_sock[2]; - gboolean externalfd; + GstPollFD sock; + GstPoll *fdset; + gboolean externalfd; struct sockaddr_in myaddr; struct ip_mreq multi_addr; |