summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog18
-rw-r--r--gst/udp/gstudpsrc.c124
-rw-r--r--gst/udp/gstudpsrc.h2
3 files changed, 90 insertions, 54 deletions
diff --git a/ChangeLog b/ChangeLog
index cdb2dddf..7929a211 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,23 @@
2008-06-13 Wim Taymans <wim.taymans@collabora.co.uk>
+ * gst/udp/gstudpsrc.c: (gst_udpsrc_class_init), (gst_udpsrc_init),
+ (gst_udpsrc_create), (gst_udpsrc_set_property),
+ (gst_udpsrc_get_property), (gst_udpsrc_start), (gst_udpsrc_stop):
+ * gst/udp/gstudpsrc.h:
+ Add property to control automatic join/leave of multicast groups.
+ Add G_LIKELY.
+ Remove setting caps on buffers explicitly, basesrc does that for us now.
+ Improve debug info.
+ Convert some non-fatal error into warnings.
+ Use g_ntohs for better portability.
+ Leave multicast groups when stopping.
+ When using external sockets, use getsockname() on them to fill up the
+ addr structure before calling methods that use the structure.
+ Should all fix #536903.
+ API: GstUDPSrc::auto-multicast property
+
+2008-06-13 Wim Taymans <wim.taymans@collabora.co.uk>
+
* gst/udp/gstudpnetutils.c: (gst_udp_is_multicast):
Use g_ntohl for better portability.
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c
index 40d91537..a9341344 100644
--- a/gst/udp/gstudpsrc.c
+++ b/gst/udp/gstudpsrc.c
@@ -170,10 +170,12 @@ GST_ELEMENT_DETAILS ("UDP packet receiver",
#define UDP_DEFAULT_SKIP_FIRST_BYTES 0
#define UDP_DEFAULT_CLOSEFD TRUE
#define UDP_DEFAULT_SOCK -1
+#define UDP_DEFAULT_AUTO_MULTICAST TRUE
enum
{
PROP_0,
+
PROP_PORT,
PROP_MULTICAST_GROUP,
PROP_URI,
@@ -183,7 +185,10 @@ enum
PROP_TIMEOUT,
PROP_SKIP_FIRST_BYTES,
PROP_CLOSEFD,
- PROP_SOCK
+ PROP_SOCK,
+ PROP_AUTO_MULTICAST,
+
+ PROP_LAST
};
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
@@ -283,6 +288,10 @@ gst_udpsrc_class_init (GstUDPSrcClass * klass)
g_param_spec_int ("sock", "Socket Handle",
"Socket currently in use for UDP reception. (-1 = no socket)",
-1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE));
+ g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
+ g_param_spec_boolean ("auto-multicast", "Auto Multicast",
+ "Automatically join/leave multicast groups",
+ UDP_DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE));
gstbasesrc_class->start = gst_udpsrc_start;
gstbasesrc_class->stop = gst_udpsrc_stop;
@@ -298,7 +307,6 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
{
WSA_STARTUP (udpsrc);
- gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
udpsrc->port = UDP_DEFAULT_PORT;
udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
@@ -308,9 +316,15 @@ gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
udpsrc->externalfd = (udpsrc->sockfd != -1);
-
+ udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
udpsrc->sock.fd = UDP_DEFAULT_SOCK;
+
+ /* configure basesrc to be a live source */
+ gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
+ /* make basesrc output a segment in time */
gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
+ /* make basesrc set timestamps on outgoing buffers based on the running_time
+ * when they were captured */
gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
}
@@ -363,12 +377,13 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
gint ret;
gboolean try_again;
- udpsrc = GST_UDPSRC (psrc);
+ udpsrc = GST_UDPSRC_CAST (psrc);
retry:
/* quick check, avoid going in select when we already have data */
readsize = 0;
- if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
+ if (G_UNLIKELY ((ret =
+ IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed;
if (readsize > 0)
@@ -388,7 +403,7 @@ retry:
ret = gst_poll_wait (udpsrc->fdset, timeout);
GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
- if (ret < 0) {
+ if (G_UNLIKELY (ret < 0)) {
if (errno == EBUSY)
goto stopped;
#ifdef G_OS_WIN32
@@ -399,7 +414,7 @@ retry:
goto select_error;
#endif
try_again = TRUE;
- } else if (ret == 0) {
+ } else if (G_UNLIKELY (ret == 0)) {
/* timeout, post element message */
gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
gst_message_new_element (GST_OBJECT_CAST (udpsrc),
@@ -407,20 +422,21 @@ retry:
"timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
try_again = TRUE;
}
- } while (try_again);
+ } while (G_UNLIKELY (try_again));
/* ask how much is available for reading on the socket, this should be exactly
* one UDP packet. We will check the return value, though, because in some
* case it can return 0 and we don't want a 0 sized buffer. */
readsize = 0;
- if ((ret = IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0)
+ if (G_UNLIKELY ((ret =
+ IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
goto ioctl_failed;
/* if we get here and there is nothing to read from the socket, the select got
- * woken up by activity on the socket but it was not a read. We how someone
+ * woken up by activity on the socket but it was not a read. We know someone
* will also do something with the socket so that we don't go into an infinite
* loop in the select(). */
- if (!readsize)
+ if (G_UNLIKELY (!readsize))
goto retry;
no_select:
@@ -433,7 +449,7 @@ no_select:
len = sizeof (struct sockaddr);
ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize,
0, (struct sockaddr *) &tmpaddr, &len);
- if (ret < 0) {
+ if (G_UNLIKELY (ret < 0)) {
#ifdef G_OS_WIN32
/* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
* generated a "port unreachable" ICMP response. We ignore that and try
@@ -458,7 +474,7 @@ no_select:
GST_BUFFER_MALLOCDATA (outbuf) = pktdata;
/* patch pktdata and len when stripping off the headers */
- if (udpsrc->skip_first_bytes != 0) {
+ if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
goto skip_error;
@@ -490,9 +506,6 @@ no_select:
errno = EAFNOSUPPORT;
goto receive_error;
}
-
- gst_buffer_set_caps (GST_BUFFER_CAST (outbuf), udpsrc->caps);
-
GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
*buf = GST_BUFFER_CAST (outbuf);
@@ -647,6 +660,9 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
case PROP_CLOSEFD:
udpsrc->closefd = g_value_get_boolean (value);
break;
+ case PROP_AUTO_MULTICAST:
+ udpsrc->auto_multicast = g_value_get_boolean (value);
+ break;
default:
break;
}
@@ -689,6 +705,9 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_SOCK:
g_value_set_int (value, udpsrc->sock.fd);
break;
+ case PROP_AUTO_MULTICAST:
+ g_value_set_boolean (value, udpsrc->auto_multicast);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -701,20 +720,22 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
{
guint bc_val;
gint reuse;
- struct sockaddr_storage my_addr;
- guint len;
int port;
GstUDPSrc *src;
gint ret;
int rcvsize;
+ guint len;
src = GST_UDPSRC (bsrc);
if (src->sockfd == -1) {
/* need to allocate a socket */
+ GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->multi_group,
+ src->port);
if ((ret =
gst_udp_get_addr (src->multi_group, src->port, &src->myaddr)) < 0)
goto getaddrinfo_error;
+
if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
goto no_socket;
@@ -732,22 +753,18 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
sizeof (src->myaddr))) < 0)
goto bind_error;
} else {
- /* we use the configured socket */
+ GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
+ /* we use the configured socket, try to get some info about it */
+ len = sizeof (src->myaddr);
+ if ((ret =
+ getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
+ &len)) < 0)
+ goto getsockname_error;
+
src->sock.fd = src->sockfd;
src->externalfd = TRUE;
}
- if (gst_udp_is_multicast (&src->myaddr)) {
- ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
- if (ret < 0)
- goto membership;
- }
-
- len = sizeof (my_addr);
- if ((ret =
- getsockname (src->sock.fd, (struct sockaddr *) &my_addr, &len)) < 0)
- goto getsockname_error;
-
len = sizeof (rcvsize);
if (src->buffer_size != 0) {
rcvsize = src->buffer_size;
@@ -759,8 +776,11 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
ret =
setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
len);
- if (ret != 0)
- goto udpbuffer_error;
+ if (ret != 0) {
+ GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+ ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
+ rcvsize, ret, g_strerror (errno), errno));
+ }
}
/* read the value of the receive buffer. Note that on linux this returns 2x the
@@ -775,21 +795,29 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
bc_val = 1;
if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,
- sizeof (bc_val))) < 0)
- goto no_broadcast;
+ sizeof (bc_val))) < 0) {
+ GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
+ ("could not configure socket for broadcast %d: %s (%d)", ret,
+ g_strerror (errno), errno));
+ }
+
+ if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
+ GST_DEBUG_OBJECT (src, "joining multicast group %s", src->multi_group);
+ ret = gst_udp_join_group (src->sock.fd, &src->myaddr);
+ if (ret < 0)
+ goto membership;
+ }
/* NOTE: sockaddr_in.sin_port works for ipv4 and ipv6 because sin_port
* follows ss_family on both */
- port = ntohs (((struct sockaddr_in *) &my_addr)->sin_port);
+ port = g_ntohs (((struct sockaddr_in *) &src->myaddr)->sin_port);
GST_DEBUG_OBJECT (src, "bound, on port %d", port);
if (port != src->port) {
src->port = port;
- GST_DEBUG_OBJECT (src, "notifying %d", port);
+ GST_DEBUG_OBJECT (src, "notifying port %d", port);
g_object_notify (G_OBJECT (src), "port");
}
- ((struct sockaddr_in *) &src->myaddr)->sin_port = htons (src->port + 1);
-
if ((src->fdset = gst_poll_new (TRUE)) == NULL)
goto no_fdset;
@@ -839,22 +867,6 @@ getsockname_error:
("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
-udpbuffer_error:
- {
- CLOSE_IF_REQUESTED (src);
- GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("Could not create a buffer of the size requested, %d: %s (%d)", ret,
- g_strerror (errno), errno));
- return FALSE;
- }
-no_broadcast:
- {
- CLOSE_IF_REQUESTED (src);
- GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
- ("could not configure socket for broadcast %d: %s (%d)", ret,
- g_strerror (errno), errno));
- return FALSE;
- }
no_fdset:
{
CLOSE_IF_REQUESTED (src);
@@ -901,6 +913,10 @@ gst_udpsrc_stop (GstBaseSrc * bsrc)
GST_DEBUG ("stopping, closing sockets");
if (src->sock.fd >= 0) {
+ if (src->auto_multicast && gst_udp_is_multicast (&src->myaddr)) {
+ GST_DEBUG_OBJECT (src, "leaving multicast group %s", src->multi_group);
+ gst_udp_leave_group (src->sock.fd, &src->myaddr);
+ }
CLOSE_IF_REQUESTED (src);
}
diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h
index 597a6a70..cdf7b35e 100644
--- a/gst/udp/gstudpsrc.h
+++ b/gst/udp/gstudpsrc.h
@@ -44,6 +44,7 @@ G_BEGIN_DECLS
(G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSRC))
#define GST_IS_UDPSRC_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSRC))
+#define GST_UDPSRC_CAST(obj) ((GstUDPSrc *)(obj))
typedef struct _GstUDPSrc GstUDPSrc;
typedef struct _GstUDPSrcClass GstUDPSrcClass;
@@ -62,6 +63,7 @@ struct _GstUDPSrc {
gint skip_first_bytes;
int sockfd;
gboolean closefd;
+ gboolean auto_multicast;
/* our sockets */
GstPollFD sock;