summaryrefslogtreecommitdiffstats
path: root/gst/udp
diff options
context:
space:
mode:
authorPeter Kjellerstedt <pkj@axis.com>2008-02-28 11:51:24 +0000
committerWim Taymans <wim.taymans@gmail.com>2008-02-28 11:51:24 +0000
commit9c814472e56ed0a6c70755cf9c70860a5fb32e08 (patch)
tree79b2ef003eab9c71c5d7829abc84c40fa36cc59f /gst/udp
parent593528c5f6bde202cf4f420afd077084ca153dd3 (diff)
gst/udp/gstudpsrc.*: Port to GstPoll. See #505417.
Original commit message from CVS: Patch by: Peter Kjellerstedt <pkj at axis com> * gst/udp/gstudpsrc.c: (gst_udpsrc_init), (gst_udpsrc_create), (gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_unlock), (gst_udpsrc_unlock_stop), (gst_udpsrc_stop): * gst/udp/gstudpsrc.h: Port to GstPoll. See #505417.
Diffstat (limited to 'gst/udp')
-rw-r--r--gst/udp/gstudpsrc.c180
-rw-r--r--gst/udp/gstudpsrc.h26
2 files changed, 72 insertions, 134 deletions
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c
index 8964f837..a7272501 100644
--- a/gst/udp/gstudpsrc.c
+++ b/gst/udp/gstudpsrc.c
@@ -141,29 +141,12 @@ typedef int socklen_t;
GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
#define GST_CAT_DEFAULT (udpsrc_debug)
-/* the select call is also performed on the control sockets, that way
- * we can send special commands to unblock or restart the select call */
-#define CONTROL_RESTART 'R' /* restart the select call */
-#define CONTROL_STOP 'S' /* stop the select call */
-#define CONTROL_SOCKETS(src) src->control_sock
-#define WRITE_SOCKET(src) src->control_sock[1]
-#define READ_SOCKET(src) src->control_sock[0]
-
-#define SEND_COMMAND(src, command, res) \
-G_STMT_START { \
- unsigned char c; c = command; \
- res = write (WRITE_SOCKET(src), &c, 1); \
-} G_STMT_END
-
-#define READ_COMMAND(src, command, res) \
-G_STMT_START { \
- res = read(READ_SOCKET(src), &command, 1); \
-} G_STMT_END
-
#define CLOSE_IF_REQUESTED(udpctx) \
+G_STMT_START { \
if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) \
- CLOSE_SOCKET(udpctx->sock); \
- udpctx->sock = -1;
+ CLOSE_SOCKET(udpctx->sock.fd); \
+ udpctx->sock.fd = -1; \
+} G_STMT_END
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
@@ -326,9 +309,7 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
udpsrc->externalfd = (udpsrc->sockfd != -1);
- udpsrc->sock = UDP_DEFAULT_SOCK;
- udpsrc->control_sock[0] = -1;
- udpsrc->control_sock[1] = -1;
+ udpsrc->sock.fd = UDP_DEFAULT_SOCK;
gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
}
@@ -368,8 +349,6 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
GstNetBuffer *outbuf;
struct sockaddr_in tmpaddr;
socklen_t len;
- fd_set read_fds;
- guint max_sock;
guint8 *pktdata;
gint pktsize;
@@ -378,6 +357,7 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
#elif defined G_OS_WIN32
gulong readsize;
#endif
+ GstClockTime timeout;
gint ret;
gboolean try_again;
@@ -386,49 +366,29 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
retry:
/* quick check, avoid going in select when we already have data */
readsize = 0;
- if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0)
+ if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
goto ioctl_failed;
if (readsize > 0)
goto no_select;
- do {
- gboolean stop;
- struct timeval timeval, *timeout;
-
- FD_ZERO (&read_fds);
- FD_SET (udpsrc->sock, &read_fds);
-#ifndef G_OS_WIN32
- FD_SET (READ_SOCKET (udpsrc), &read_fds);
-#endif
- max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc));
+ if (udpsrc->timeout > 0) {
+ timeout = udpsrc->timeout * GST_USECOND;
+ } else {
+ timeout = GST_CLOCK_TIME_NONE;
+ }
+ do {
try_again = FALSE;
- stop = FALSE;
GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
udpsrc->timeout);
- if (udpsrc->timeout > 0) {
- timeval.tv_sec = udpsrc->timeout / 1000000;
- timeval.tv_usec = udpsrc->timeout % 1000000;
- timeout = &timeval;
- } else {
- timeout = NULL;
- }
-
-#ifdef G_OS_WIN32
- if (((max_sock + 1) != READ_SOCKET (udpsrc)) ||
- ((max_sock + 1) != WRITE_SOCKET (udpsrc))) {
- ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
- } else {
- ret = 1;
- }
-#else
- ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
-#endif
+ ret = gst_poll_wait (udpsrc->fdset, timeout);
GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
if (ret < 0) {
+ if (errno == EBUSY)
+ goto stopped;
#ifdef G_OS_WIN32
if (WSAGetLastError () != WSAEINTR)
goto select_error;
@@ -444,9 +404,6 @@ retry:
gst_structure_new ("GstUDPSrcTimeout",
"timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
try_again = TRUE;
- } else {
- if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds))
- goto stopped;
}
} while (try_again);
@@ -454,7 +411,7 @@ retry:
* one UDP packet. We will check the return value, though, because in some
* case it can return 0 and we don't want a 0 sized buffer. */
readsize = 0;
- if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0)
+ if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
goto ioctl_failed;
/* if we get here and there is nothing to read from the socket, the select got
@@ -472,7 +429,7 @@ no_select:
while (TRUE) {
len = sizeof (struct sockaddr);
- ret = recvfrom (udpsrc->sock, pktdata, pktsize,
+ ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize,
0, (struct sockaddr *) &tmpaddr, &len);
if (ret < 0) {
if (errno != EAGAIN && errno != EINTR)
@@ -690,7 +647,7 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
g_value_set_boolean (value, udpsrc->closefd);
break;
case PROP_SOCK:
- g_value_set_int (value, udpsrc->sock);
+ g_value_set_int (value, udpsrc->sock.fd);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -713,22 +670,6 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
src = GST_UDPSRC (bsrc);
-#ifdef G_OS_WIN32
- GST_DEBUG_OBJECT (src, "creating pipe");
-
- /* This should work on UNIX too. PF_UNIX sockets replaced with pipe */
- /* pipe( CONTROL_SOCKETS(src), 4096, _O_BINARY ) */
- if ((ret = _pipe (CONTROL_SOCKETS (src), 4096, _O_BINARY)) < 0)
- goto no_socket_pair;
-#else
- GST_DEBUG_OBJECT (src, "creating socket pair");
- if ((ret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src))) < 0)
- goto no_socket_pair;
-
- fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
- fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
-#endif
-
if (!inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr)))
src->multi_addr.imr_multiaddr.s_addr = 0;
@@ -737,12 +678,12 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
if ((ret = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
goto no_socket;
- src->sock = ret;
+ src->sock.fd = ret;
src->externalfd = FALSE;
reuse = 1;
if ((ret =
- setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
+ setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
sizeof (reuse))) < 0)
goto setsockopt_error;
@@ -756,25 +697,26 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
src->myaddr.sin_addr.s_addr = INADDR_ANY;
GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
- if ((ret = bind (src->sock, (struct sockaddr *) &src->myaddr,
+ if ((ret = bind (src->sock.fd, (struct sockaddr *) &src->myaddr,
sizeof (src->myaddr))) < 0)
goto bind_error;
} else {
/* we use the configured socket */
- src->sock = src->sockfd;
+ src->sock.fd = src->sockfd;
src->externalfd = TRUE;
}
if (src->multi_addr.imr_multiaddr.s_addr) {
src->multi_addr.imr_interface.s_addr = INADDR_ANY;
if ((ret =
- setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+ setsockopt (src->sock.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
&src->multi_addr, sizeof (src->multi_addr))) < 0)
goto membership;
}
len = sizeof (my_addr);
- if ((ret = getsockname (src->sock, (struct sockaddr *) &my_addr, &len)) < 0)
+ if ((ret =
+ getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0)
goto getsockname_error;
len = sizeof (rcvsize);
@@ -785,7 +727,9 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
/* set buffer size, Note that on Linux this is typically limited to a
* maximum of around 100K. Also a minimum of 128 bytes is required on
* Linux. */
- ret = setsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, len);
+ ret =
+ setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
+ len);
if (ret != 0)
goto udpbuffer_error;
}
@@ -793,14 +737,15 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
/* read the value of the receive buffer. Note that on linux this returns 2x the
* value we set because the kernel allocates extra memory for metadata.
* The default on Linux is about 100K (which is about 50K without metadata) */
- ret = getsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
+ ret =
+ getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
if (ret == 0)
GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
else
GST_DEBUG_OBJECT (src, "could not get udp buffer size");
bc_val = 1;
- if ((ret = setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
+ if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
sizeof (bc_val))) < 0)
goto no_broadcast;
@@ -814,15 +759,20 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
src->myaddr.sin_port = htons (src->port + 1);
+#ifdef G_OS_WIN32
+ if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, FALSE)) == NULL)
+ goto no_fdset;
+#else
+ if ((src->fdset = gst_poll_new (GST_POLL_MODE_AUTO, TRUE)) == NULL)
+ goto no_fdset;
+#endif
+
+ gst_poll_add_fd (src->fdset, &src->sock);
+ gst_poll_fd_ctl_read (src->fdset, &src->sock, TRUE);
+
return TRUE;
/* ERRORS */
-no_socket_pair:
- {
- GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
- ("no socket pair %d: %s (%d)", ret, g_strerror (errno), errno));
- return FALSE;
- }
no_socket:
{
GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
@@ -873,19 +823,25 @@ no_broadcast:
g_strerror (errno), errno));
return FALSE;
}
+no_fdset:
+ {
+ CLOSE_IF_REQUESTED (src);
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
+ ("could not create an fdset %d: %s (%d)", ret, g_strerror (errno),
+ errno));
+ return FALSE;
+ }
}
static gboolean
gst_udpsrc_unlock (GstBaseSrc * bsrc)
{
GstUDPSrc *src;
- gint res;
src = GST_UDPSRC (bsrc);
- GST_LOG_OBJECT (src, "sending stop command");
- SEND_COMMAND (src, CONTROL_STOP, res);
- GST_LOG_OBJECT (src, "sent stop command %d", res);
+ GST_LOG_OBJECT (src, "Flushing");
+ gst_poll_set_flushing (src->fdset, TRUE);
return TRUE;
}
@@ -897,21 +853,8 @@ gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
src = GST_UDPSRC (bsrc);
- GST_LOG_OBJECT (src, "clearing unlock command queue");
-
- while (TRUE) {
- gchar command;
- int res;
-
- GST_LOG_OBJECT (src, "reading command");
-
- READ_COMMAND (src, command, res);
- if (res <= 0) {
- GST_LOG_OBJECT (src, "no more commands");
- /* no more commands */
- break;
- }
- }
+ GST_LOG_OBJECT (src, "No longer flushing");
+ gst_poll_set_flushing (src->fdset, FALSE);
return TRUE;
}
@@ -925,18 +868,13 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
GST_DEBUG ("stopping, closing sockets");
- if (src->sock != -1) {
+ if (src->sock.fd >= 0) {
CLOSE_IF_REQUESTED (src);
}
- /* pipes on WIN32 else sockets */
- if (src->control_sock[0] != -1) {
- close (src->control_sock[0]);
- src->control_sock[0] = -1;
- }
- if (src->control_sock[1] != -1) {
- close (src->control_sock[1]);
- src->control_sock[1] = -1;
+ if (src->fdset) {
+ gst_poll_free (src->fdset);
+ src->fdset = NULL;
}
WSA_CLEANUP (src);
diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h
index c672d2a2..e736a592 100644
--- a/gst/udp/gstudpsrc.h
+++ b/gst/udp/gstudpsrc.h
@@ -52,21 +52,21 @@ struct _GstUDPSrc {
GstPushSrc parent;
/* properties */
- gchar *uri;
- int port;
- gchar *multi_group;
- gint ttl;
- GstCaps *caps;
- gint buffer_size;
- guint64 timeout;
- gint skip_first_bytes;
- int sockfd;
- gboolean closefd;
+ gchar *uri;
+ int port;
+ gchar *multi_group;
+ gint ttl;
+ GstCaps *caps;
+ gint buffer_size;
+ guint64 timeout;
+ gint skip_first_bytes;
+ int sockfd;
+ gboolean closefd;
/* our sockets */
- int sock;
- int control_sock[2];
- gboolean externalfd;
+ GstPollFD sock;
+ GstPoll *fdset;
+ gboolean externalfd;
struct sockaddr_in myaddr;
struct ip_mreq multi_addr;