summaryrefslogtreecommitdiffstats
path: root/gst/udp
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2008-06-13 11:54:05 +0000
committerWim Taymans <wim.taymans@gmail.com>2008-06-13 11:54:05 +0000
commitccddfc5da7afbd4475093a3b77231ed42911e5c8 (patch)
treeb6463959d2bb837ca49fc186679b0000ecd48d6f /gst/udp
parent5b751d02901fd672973d411d153820fc33f40508 (diff)
gst/udp/gstudpsrc.*: Add property to control automatic join/leave of multicast groups.
Original commit message from CVS: * gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init), (gst_udpsrc_create), (gst_udpsrc_set_property), (gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_stop): * gst/udp/gstudpsrc.h: Add property to control automatic join/leave of multicast groups. Add G_LIKELY. Remove setting caps on buffers explicitly, basesrc does that for us now. Improve debug info. Convert some non-fatal error into warnings. Use g_ntohs for better portability. Leave multicast groups when stopping. When using external sockets, use getsockname() on them to fill up the addr structure before calling methods that use the structure. Should all fix #536903. API: GstUDPSrc::auto-multicast property
Diffstat (limited to 'gst/udp')
-rw-r--r--gst/udp/gstudpsrc.c124
-rw-r--r--gst/udp/gstudpsrc.h2
2 files changed, 72 insertions, 54 deletions
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c
index 40d91537..a9341344 100644
--- a/gst/udp/gstudpsrc.c
+++ b/gst/udp/gstudpsrc.c
@@ -170,10 +170,12 @@ GST_ELEMENT_DETAILS ("UDP packet receiver",
#define UDP_DEFAULT_SKIP_FIRST_BYTES 0
#define UDP_DEFAULT_CLOSEFD TRUE
#define UDP_DEFAULT_SOCK -1
+#define UDP_DEFAULT_AUTO_MULTICAST TRUE
enum
{
PROP_0,
+
PROP_PORT,
PROP_MULTICAST_GROUP,
PROP_URI,
@@ -183,7 +185,10 @@ enum
PROP_TIMEOUT,
PROP_SKIP_FIRST_BYTES,
PROP_CLOSEFD,
- PROP_SOCK
+ PROP_SOCK,
+ PROP_AUTO_MULTICAST,
+
+ PROP_LAST
};
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
@@ -283,6 +288,10 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
g_param_spec_int ("sock", "Socket Handle",
"Socket currently in use for UDP reception. (-1 = no socket)",
-1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE));
+ g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
+ g_param_spec_boolean ("auto-multicast", "Auto Multicast",
+ "Automatically join/leave multicast groups",
+ UDP_DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE));
gstbasesrc_class->start = gst_udpsrc_start;
gstbasesrc_class->stop = gst_udpsrc_stop;
@@ -298,7 +307,6 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
{
WSA_STARTUP (udpsrc);
- gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
udpsrc->port = UDP_DEFAULT_PORT;
udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
@@ -308,9 +316,15 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
udpsrc->externalfd = (udpsrc->sockfd != -1);
-
+ udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
udpsrc->sock.fd = UDP_DEFAULT_SOCK;
+
+ /* configure basesrc to be a live source */
+ gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
+ /* make basesrc output a segment in time */
gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
+ /* make basesrc set timestamps on outgoing buffers based on the running_time
+ * when they were captured */
gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
}
@@ -363,12 +377,13 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
gint ret;
gboolean try_again;
- udpsrc = GST_UDPSRC (psrc);
+ udpsrc = GST_UDPSRC_CAST (psrc);
retry:
/* quick check, avoid going in select when we already have data */
readsize = 0;
- if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
+ if (G_UNLIKELY ((ret =
+ IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed;
if (readsize > 0)
@@ -388,7 +403,7 @@ retry:
ret = gst_poll_wait (udpsrc->fdset, timeout);
GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
- if (ret < 0) {
+ if (G_UNLIKELY (ret < 0)) {
if (errno == EBUSY)
goto stopped;
#ifdef G_OS_WIN32
@@ -399,7 +414,7 @@ retry:
goto select_error;
#endif
try_again = TRUE;
- } else if (ret == 0) {
+ } else if (G_UNLIKELY (ret == 0)) {
/* timeout, post element message */
gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
gst_message_new_element (GST_OBJECT_CAST (udpsrc),
@@ -407,20 +422,21 @@ retry:
"timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
try_again = TRUE;
}
- } while (try_again);
+ } while (G_UNLIKELY (try_again));
/* ask how much is available for reading on the socket, this should be exactly
* one UDP packet. We will check the return value, though, because in some
* case it can return 0 and we don't want a 0 sized buffer. */
readsize = 0;
- if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
+ if (G_UNLIKELY ((ret =
+ IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed;
/* if we get here and there is nothing to read from the socket, the select got
- * woken up by activity on the socket but it was not a read. We how someone
+ * woken up by activity on the socket but it was not a read. We know someone
* will also do something with the socket so that we don't go into an infinite
* loop in the select(). */
- if (!readsize)
+ if (G_UNLIKELY (!readsize))
goto retry;
no_select:
@@ -433,7 +449,7 @@ no_select:
len = sizeof (struct sockaddr);
ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize,
0, (struct sockaddr *) &tmpaddr, &len);
- if (ret < 0) {
+ if (G_UNLIKELY (ret < 0)) {
#ifdef G_OS_WIN32
/* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
* generated a "port unreachable" ICMP response. We ignore that and try
@@ -458,7 +474,7 @@ no_select:
GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
/* patch pktdata and len when stripping off the headers */
- if (udpsrc->skip_first_bytes != 0) {
+ if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
goto skip_error;
@@ -490,9 +506,6 @@ no_select:
errno = EAFNOSUPPORT;
goto receive_error;
}
-
- gst_buffer_set_caps (GST_BUFFER_CAST (outbuf), udpsrc->caps);
-
GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
*buf = GST_BUFFER_CAST (outbuf);
@@ -647,6 +660,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
case PROP_CLOSEFD:
udpsrc->closefd = g_value_get_boolean (value);
break;
+ case PROP_AUTO_MULTICAST:
+ udpsrc->auto_multicast = g_value_get_boolean (value);
+ break;
default:
break;
}
@@ -689,6 +705,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SOCK:
g_value_set_int (value, udpsrc->sock.fd);
break;
+ case PROP_AUTO_MULTICAST:
+ g_value_set_boolean (value, udpsrc->auto_multicast);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -701,20 +720,22 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
{
guint bc_val;
gint reuse;
- struct sockaddr_storage my_addr;
- guint len;
int port;
GstUDPSrc *src;
gint ret;
int rcvsize;
+ guint len;
src = GST_UDPSRC (bsrc);
if (src->sockfd == -1) {
/* need to allocate a socket */
+ GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->multi_group,
+ src->port);
if ((ret =
gst_udp_get_addr (src->multi_group, src->port, &src->myaddr)) < 0)
goto getaddrinfo_error;
+
if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
goto no_socket;
@@ -732,22 +753,18 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
sizeof (src->myaddr))) < 0)
goto bind_error;
} else {
- /* we use the configured socket */
+ GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
+ /* we use the configured socket, try to get some info about it */
+ len = sizeof (src->myaddr);
+ if ((ret =
+ getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
+ &len)) < 0)
+ goto getsockname_error;
+
src->sock.fd = src->sockfd;
src->externalfd = TRUE;
}
- if (gst_udp_is_multicast (&src->myaddr)) {
- ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
- if (ret < 0)
- goto membership;
- }
-
- len = sizeof (my_addr);
- if ((ret =
- getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0)
- goto getsockname_error;
-
len = sizeof (rcvsize);
if (src->buffer_size != 0) {
rcvsize = src->buffer_size;
@@ -759,8 +776,11 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
ret =
setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
len);
- if (ret != 0)
- goto udpbuffer_error;
+ if (ret != 0) {
+ GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+ ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
+ rcvsize, ret, g_strerror (errno), errno));
+ }
}
/* read the value of the receive buffer. Note that on linux this returns 2x the
@@ -775,21 +795,29 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
bc_val = 1;
if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
- sizeof (bc_val))) < 0)
- goto no_broadcast;
+ sizeof (bc_val))) < 0) {
+ GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+ ("could not configure socket for broadcast %d: %s (%d)", ret,
+ g_strerror (errno), errno));
+ }
+
+ if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
+ GST_DEBUG_OBJECT (src, "joining multicast group %s", src->multi_group);
+ ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
+ if (ret < 0)
+ goto membership;
+ }
/* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
* follows ss_family on both */
- port = ntohs (((struct sockaddr_in *) &my_addr)->sin_port);
+ port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
GST_DEBUG_OBJECT (src, "bound, on port %d", port);
if (port != src->port) {
src->port = port;
- GST_DEBUG_OBJECT (src, "notifying %d", port);
+ GST_DEBUG_OBJECT (src, "notifying port %d", port);
g_object_notify (G_OBJECT (src), "port");
}
- ((struct sockaddr_in *) &src->myaddr)->sin_port = htons (src->port + 1);
-
if ((src->fdset = gst_poll_new (TRUE)) == NULL)
goto no_fdset;
@@ -839,22 +867,6 @@ getsockname_error:
("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
-udpbuffer_error:
- {
- CLOSE_IF_REQUESTED (src);
- GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("Could not create a buffer of the size requested, %d: %s (%d)", ret,
- g_strerror (errno), errno));
- return FALSE;
- }
-no_broadcast:
- {
- CLOSE_IF_REQUESTED (src);
- GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("could not configure socket for broadcast %d: %s (%d)", ret,
- g_strerror (errno), errno));
- return FALSE;
- }
no_fdset:
{
CLOSE_IF_REQUESTED (src);
@@ -901,6 +913,10 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
GST_DEBUG ("stopping, closing sockets");
if (src->sock.fd >= 0) {
+ if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
+ GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->multi_group);
+ gst_udp_leave_group (src->sock.fd, &src->myaddr);
+ }
CLOSE_IF_REQUESTED (src);
}
diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h
index 597a6a70..cdf7b35e 100644
--- a/gst/udp/gstudpsrc.h
+++ b/gst/udp/gstudpsrc.h
@@ -44,6 +44,7 @@ G_BEGIN_DECLS
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSRC))
#define GST_IS_UDPSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSRC))
+#define GST_UDPSRC_CAST(obj) ((GstUDPSrc *)(obj))
typedef struct _GstUDPSrc GstUDPSrc;
typedef struct _GstUDPSrcClass GstUDPSrcClass;
@@ -62,6 +63,7 @@ struct _GstUDPSrc {
gint skip_first_bytes;
int sockfd;
gboolean closefd;
+ gboolean auto_multicast;
/* our sockets */
GstPollFD sock;