summaryrefslogtreecommitdiffstats
path: root/gst/udp/gstudpsrc.c
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2005-05-12 15:32:51 +0000
committerWim Taymans <wim.taymans@gmail.com>2005-05-12 15:32:51 +0000
commit726c253c14a22711b72532441417a19a41a136e3 (patch)
tree63efae9ed46082c0b4432e73ebe9e067b6d10761 /gst/udp/gstudpsrc.c
parent2d1475d582c26be5308e57a1f6c552af2c8c6443 (diff)
gst/udp/: Added multifdsink to send UDP to multiple addresses.
Original commit message from CVS: * gst/udp/.cvsignore: * gst/udp/Makefile.am: * gst/udp/gstmultiudpsink.c: (gst_multiudpsink_get_type), (gst_multiudpsink_base_init), (gst_multiudpsink_class_init), (gst_multiudpsink_init), (gst_multiudpsink_finalize), (gst_multiudpsink_get_times), (gst_multiudpsink_render), (gst_multiudpsink_set_property), (gst_multiudpsink_get_property), (gst_multiudpsink_init_send), (gst_multiudpsink_close), (gst_multiudpsink_add), (gst_multiudpsink_remove), (gst_multiudpsink_clear), (gst_multiudpsink_get_stats), (gst_multiudpsink_change_state): * gst/udp/gstmultiudpsink.h: * gst/udp/gstudp-marshal.list: * gst/udp/gstudp.c: (plugin_init): * gst/udp/gstudp.h: * gst/udp/gstudpsink.c: (gst_udpsink_get_type), (gst_udpsink_base_init), (gst_udpsink_class_init), (gst_udpsink_init), (gst_udpsink_set_uri), (gst_udpsink_set_property), (gst_udpsink_get_property), (gst_udpsink_uri_get_type), (gst_udpsink_uri_get_protocols), (gst_udpsink_uri_get_uri), (gst_udpsink_uri_set_uri), (gst_udpsink_uri_handler_init): * gst/udp/gstudpsink.h: * gst/udp/gstudpsrc.c: (gst_udpsrc_get_type), (gst_udpsrc_base_init), (gst_udpsrc_class_init), (gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start), (gst_udpsrc_unlock), (gst_udpsrc_stop): * gst/udp/gstudpsrc.h: Added multifdsink to send UDP to multiple addresses. Cleaned up UDP source/sink elements some more. Make UDP sink extends from multiudpsink.
Diffstat (limited to 'gst/udp/gstudpsrc.c')
-rw-r--r--gst/udp/gstudpsrc.c230
1 files changed, 196 insertions, 34 deletions
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c
index 76c4b73a..ffec6a02 100644
--- a/gst/udp/gstudpsrc.c
+++ b/gst/udp/gstudpsrc.c
@@ -1,5 +1,5 @@
/* GStreamer
- * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
@@ -24,6 +24,33 @@
#include "gstudpsrc.h"
#include <unistd.h>
+#include <sys/ioctl.h>
+
+#ifdef HAVE_FIONREAD_IN_SYS_FILIO
+#include <sys/filio.h>
+#endif
+
+GST_DEBUG_CATEGORY (udpsrc_debug);
+#define GST_CAT_DEFAULT (udpsrc_debug)
+
+/* the select call is also performed on the control sockets, that way
+ * we can send special commands to unblock or restart the select call */
+#define CONTROL_RESTART 'R' /* restart the select call */
+#define CONTROL_STOP 'S' /* stop the select call */
+#define CONTROL_SOCKETS(src) src->control_sock
+#define WRITE_SOCKET(src) src->control_sock[1]
+#define READ_SOCKET(src) src->control_sock[0]
+
+#define SEND_COMMAND(src, command) \
+G_STMT_START { \
+ unsigned char c; c = command; \
+ write (WRITE_SOCKET(src), &c, 1); \
+} G_STMT_END
+
+#define READ_COMMAND(src, command, res) \
+G_STMT_START { \
+ res = read(READ_SOCKET(src), &command, 1); \
+} G_STMT_END
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
@@ -35,7 +62,7 @@ static GstElementDetails gst_udpsrc_details =
GST_ELEMENT_DETAILS ("UDP packet receiver",
"Source/Network",
"Receive data over the network via UDP",
- "Wim Taymans <wim.taymans@chello.be>");
+ "Wim Taymans <wim@fluendo.com>");
/* UDPSrc signals and args */
enum
@@ -66,6 +93,7 @@ static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
+static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
static void gst_udpsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@@ -153,7 +181,11 @@ gst_udpsrc_class_init (GstUDPSrc * klass)
gstbasesrc_class->start = gst_udpsrc_start;
gstbasesrc_class->stop = gst_udpsrc_stop;
+ gstbasesrc_class->unlock = gst_udpsrc_unlock;
+
gstpushsrc_class->create = gst_udpsrc_create;
+
+ GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
}
static void
@@ -172,30 +204,82 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
GstBuffer *outbuf;
struct sockaddr_in tmpaddr;
socklen_t len;
- gint numbytes;
fd_set read_fds;
guint max_sock;
gchar *pktdata;
gint pktsize;
+ gint readsize;
+ gint ret;
+ gboolean try_again;
udpsrc = GST_UDPSRC (psrc);
FD_ZERO (&read_fds);
FD_SET (udpsrc->sock, &read_fds);
- max_sock = udpsrc->sock;
+ FD_SET (READ_SOCKET (udpsrc), &read_fds);
+ max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc));
- /* FIXME, add another socket to unblock */
- if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0)
- goto select_error;
+ do {
+ gboolean stop;
- pktdata = g_malloc (24000);
- pktsize = 24000;
+ try_again = FALSE;
+ stop = FALSE;
+
+ GST_LOG_OBJECT (udpsrc, "doing select");
+ ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL);
+ GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
+ if (ret <= 0) {
+ if (errno != EAGAIN && errno != EINTR)
+ goto select_error;
+ else
+ try_again = TRUE;
+ } else {
+ /* got control message */
+ if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds)) {
+ while (TRUE) {
+ gchar command;
+ int res;
+
+ READ_COMMAND (udpsrc, command, res);
+ if (res < 0) {
+ GST_LOG_OBJECT (udpsrc, "no more commands");
+ /* no more commands */
+ break;
+ }
+
+ switch (command) {
+ case CONTROL_STOP:
+ /* break out of the select loop */
+ GST_LOG_OBJECT (udpsrc, "stop");
+ /* stop this function */
+ stop = TRUE;
+ break;
+ default:
+ GST_WARNING_OBJECT (udpsrc, "unkown");
+ g_warning ("multiudpsink: unknown control message received");
+ break;
+ }
+ }
+ }
+ }
+ if (stop)
+ goto stopped;
+ } while (try_again);
+
+ /* ask how much is available for reading on the socket */
+ if ((ret = ioctl (udpsrc->sock, FIONREAD, &readsize)) < 0)
+ goto ioctl_failed;
+
+ GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", readsize);
+
+ pktdata = g_malloc (readsize);
+ pktsize = readsize;
len = sizeof (struct sockaddr);
while (TRUE) {
- numbytes = recvfrom (udpsrc->sock, pktdata, pktsize,
+ ret = recvfrom (udpsrc->sock, pktdata, pktsize,
0, (struct sockaddr *) &tmpaddr, &len);
- if (numbytes < 0) {
+ if (ret < 0) {
if (errno != EAGAIN && errno != EINTR)
goto receive_error;
} else
@@ -204,7 +288,7 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
outbuf = gst_buffer_new ();
GST_BUFFER_DATA (outbuf) = pktdata;
- GST_BUFFER_SIZE (outbuf) = numbytes;
+ GST_BUFFER_SIZE (outbuf) = ret;
*buf = outbuf;
@@ -212,13 +296,26 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
select_error:
{
- GST_DEBUG ("got select error");
+ GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+ ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
+ return GST_FLOW_ERROR;
+ }
+stopped:
+ {
+ GST_DEBUG ("stop called");
+ return GST_FLOW_WRONG_STATE;
+ }
+ioctl_failed:
+ {
+ GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+ ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
return GST_FLOW_ERROR;
}
receive_error:
{
- gst_buffer_unref (outbuf);
- GST_DEBUG ("got receive error");
+ g_free (pktdata);
+ GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
+ ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
return GST_FLOW_ERROR;
}
}
@@ -250,7 +347,8 @@ gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
wrong_protocol:
{
g_free (protocol);
- GST_DEBUG ("error parsing uri %s", uri);
+ GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
+ ("error parsing uri %s: wrong protocol", uri));
return FALSE;
}
}
@@ -313,39 +411,53 @@ static gboolean
gst_udpsrc_start (GstBaseSrc * bsrc)
{
guint bc_val;
- gint reuse = 1;
+ gint reuse;
struct sockaddr_in my_addr;
int len, port;
GstUDPSrc *src;
+ gint ret;
src = GST_UDPSRC (bsrc);
- memset (&src->myaddr, 0, sizeof (src->myaddr));
- src->myaddr.sin_family = AF_INET; /* host byte order */
- src->myaddr.sin_port = htons (src->port); /* short, network byte order */
- src->myaddr.sin_addr.s_addr = INADDR_ANY;
+ if ((ret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src))) < 0)
+ goto no_socket_pair;
- if ((src->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
+ fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
+ fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
+
+ if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
goto no_socket;
- if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
- sizeof (reuse)) < 0)
+ src->sock = ret;
+
+ reuse = 1;
+ if ((ret =
+ setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
+ sizeof (reuse))) < 0)
goto setsockopt_error;
- if (bind (src->sock, (struct sockaddr *) &src->myaddr,
- sizeof (src->myaddr)) < 0)
+ memset (&src->myaddr, 0, sizeof (src->myaddr));
+ src->myaddr.sin_family = AF_INET; /* host byte order */
+ src->myaddr.sin_port = htons (src->port); /* short, network byte order */
+ src->myaddr.sin_addr.s_addr = INADDR_ANY;
+
+ if ((ret =
+ bind (src->sock, (struct sockaddr *) &src->myaddr,
+ sizeof (src->myaddr))) < 0)
goto bind_error;
if (inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) {
if (src->multi_addr.imr_multiaddr.s_addr) {
src->multi_addr.imr_interface.s_addr = INADDR_ANY;
- setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &src->multi_addr,
- sizeof (src->multi_addr));
+ if ((ret =
+ setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
+ &src->multi_addr, sizeof (src->multi_addr))) < 0)
+ goto membership;
}
}
len = sizeof (my_addr);
- if (getsockname (src->sock, (struct sockaddr *) &my_addr, &len) < 0)
+ if ((ret = getsockname (src->sock, (struct sockaddr *) &my_addr, &len)) < 0)
goto getsockname_error;
port = ntohs (my_addr.sin_port);
@@ -355,35 +467,85 @@ gst_udpsrc_start (GstBaseSrc * bsrc)
}
bc_val = 1;
- setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val));
+ if ((ret =
+ setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
+ sizeof (bc_val))) < 0)
+ goto no_broadcast;
+
src->myaddr.sin_port = htons (src->port + 1);
return TRUE;
/* ERRORS */
+no_socket_pair:
+ {
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
+ ("no socket pair %d: %s (%d)", ret, g_strerror (errno), errno));
+ return FALSE;
+ }
no_socket:
{
- GST_DEBUG ("no_socket");
+ GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
+ ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
setsockopt_error:
{
- GST_DEBUG ("setsockopt failed");
+ close (src->sock);
+ src->sock = -1;
+ GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+ ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
bind_error:
{
- GST_DEBUG ("bind failed");
+ close (src->sock);
+ src->sock = -1;
+ GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+ ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
+ return FALSE;
+ }
+membership:
+ {
+ close (src->sock);
+ src->sock = -1;
+ GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+ ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
return FALSE;
}
getsockname_error:
{
- GST_DEBUG ("getsockname failed");
+ close (src->sock);
+ src->sock = -1;
+ GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+ ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
+ return FALSE;
+ }
+no_broadcast:
+ {
+ close (src->sock);
+ src->sock = -1;
+ GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
+ ("could not configure socket for broadcast %d: %s (%d)", ret,
+ g_strerror (errno), errno));
return FALSE;
}
}
static gboolean
+gst_udpsrc_unlock (GstBaseSrc * bsrc)
+{
+ GstUDPSrc *src;
+
+ src = GST_UDPSRC (bsrc);
+
+ GST_DEBUG ("sending stop command");
+ SEND_COMMAND (src, CONTROL_STOP);
+
+ return TRUE;
+}
+
+static gboolean
gst_udpsrc_stop (GstBaseSrc * bsrc)
{
GstUDPSrc *src;