diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2005-05-12 15:32:51 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2005-05-12 15:32:51 +0000 |
commit | 726c253c14a22711b72532441417a19a41a136e3 (patch) | |
tree | 63efae9ed46082c0b4432e73ebe9e067b6d10761 /gst/udp/gstudpsrc.c | |
parent | 2d1475d582c26be5308e57a1f6c552af2c8c6443 (diff) |
gst/udp/: Added multifdsink to send UDP to multiple addresses.
Original commit message from CVS:
* gst/udp/.cvsignore:
* gst/udp/Makefile.am:
* gst/udp/gstmultiudpsink.c: (gst_multiudpsink_get_type),
(gst_multiudpsink_base_init), (gst_multiudpsink_class_init),
(gst_multiudpsink_init), (gst_multiudpsink_finalize),
(gst_multiudpsink_get_times), (gst_multiudpsink_render),
(gst_multiudpsink_set_property), (gst_multiudpsink_get_property),
(gst_multiudpsink_init_send), (gst_multiudpsink_close),
(gst_multiudpsink_add), (gst_multiudpsink_remove),
(gst_multiudpsink_clear), (gst_multiudpsink_get_stats),
(gst_multiudpsink_change_state):
* gst/udp/gstmultiudpsink.h:
* gst/udp/gstudp-marshal.list:
* gst/udp/gstudp.c: (plugin_init):
* gst/udp/gstudp.h:
* gst/udp/gstudpsink.c: (gst_udpsink_get_type),
(gst_udpsink_base_init), (gst_udpsink_class_init),
(gst_udpsink_init), (gst_udpsink_set_uri),
(gst_udpsink_set_property), (gst_udpsink_get_property),
(gst_udpsink_uri_get_type), (gst_udpsink_uri_get_protocols),
(gst_udpsink_uri_get_uri), (gst_udpsink_uri_set_uri),
(gst_udpsink_uri_handler_init):
* gst/udp/gstudpsink.h:
* gst/udp/gstudpsrc.c: (gst_udpsrc_get_type),
(gst_udpsrc_base_init), (gst_udpsrc_class_init),
(gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start),
(gst_udpsrc_unlock), (gst_udpsrc_stop):
* gst/udp/gstudpsrc.h:
Added multifdsink to send UDP to multiple addresses.
Cleaned up UDP source/sink elements some more.
Make UDP sink extends from multiudpsink.
Diffstat (limited to 'gst/udp/gstudpsrc.c')
-rw-r--r-- | gst/udp/gstudpsrc.c | 230 |
1 files changed, 196 insertions, 34 deletions
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index 76c4b73a..ffec6a02 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -1,5 +1,5 @@ /* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> + * Copyright (C) <2005> Wim Taymans <wim@fluendo.com> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -24,6 +24,33 @@ #include "gstudpsrc.h" #include <unistd.h> +#include <sys/ioctl.h> + +#ifdef HAVE_FIONREAD_IN_SYS_FILIO +#include <sys/filio.h> +#endif + +GST_DEBUG_CATEGORY (udpsrc_debug); +#define GST_CAT_DEFAULT (udpsrc_debug) + +/* the select call is also performed on the control sockets, that way + * we can send special commands to unblock or restart the select call */ +#define CONTROL_RESTART 'R' /* restart the select call */ +#define CONTROL_STOP 'S' /* stop the select call */ +#define CONTROL_SOCKETS(src) src->control_sock +#define WRITE_SOCKET(src) src->control_sock[1] +#define READ_SOCKET(src) src->control_sock[0] + +#define SEND_COMMAND(src, command) \ +G_STMT_START { \ + unsigned char c; c = command; \ + write (WRITE_SOCKET(src), &c, 1); \ +} G_STMT_END + +#define READ_COMMAND(src, command, res) \ +G_STMT_START { \ + res = read(READ_SOCKET(src), &command, 1); \ +} G_STMT_END static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, @@ -35,7 +62,7 @@ static GstElementDetails gst_udpsrc_details = GST_ELEMENT_DETAILS ("UDP packet receiver", "Source/Network", "Receive data over the network via UDP", - "Wim Taymans <wim.taymans@chello.be>"); + "Wim Taymans <wim@fluendo.com>"); /* UDPSrc signals and args */ enum @@ -66,6 +93,7 @@ static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data); static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf); static gboolean gst_udpsrc_start (GstBaseSrc * bsrc); static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc); +static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc); static void gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -153,7 +181,11 @@ gst_udpsrc_class_init (GstUDPSrc * klass) gstbasesrc_class->start = gst_udpsrc_start; gstbasesrc_class->stop = gst_udpsrc_stop; + gstbasesrc_class->unlock = gst_udpsrc_unlock; + gstpushsrc_class->create = gst_udpsrc_create; + + GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src"); } static void @@ -172,30 +204,82 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) GstBuffer *outbuf; struct sockaddr_in tmpaddr; socklen_t len; - gint numbytes; fd_set read_fds; guint max_sock; gchar *pktdata; gint pktsize; + gint readsize; + gint ret; + gboolean try_again; udpsrc = GST_UDPSRC (psrc); FD_ZERO (&read_fds); FD_SET (udpsrc->sock, &read_fds); - max_sock = udpsrc->sock; + FD_SET (READ_SOCKET (udpsrc), &read_fds); + max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc)); - /* FIXME, add another socket to unblock */ - if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0) - goto select_error; + do { + gboolean stop; - pktdata = g_malloc (24000); - pktsize = 24000; + try_again = FALSE; + stop = FALSE; + + GST_LOG_OBJECT (udpsrc, "doing select"); + ret = select (max_sock + 1, &read_fds, NULL, NULL, NULL); + GST_LOG_OBJECT (udpsrc, "select returned %d", ret); + if (ret <= 0) { + if (errno != EAGAIN && errno != EINTR) + goto select_error; + else + try_again = TRUE; + } else { + /* got control message */ + if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds)) { + while (TRUE) { + gchar command; + int res; + + READ_COMMAND (udpsrc, command, res); + if (res < 0) { + GST_LOG_OBJECT (udpsrc, "no more commands"); + /* no more commands */ + break; + } + + switch (command) { + case CONTROL_STOP: + /* break out of the select loop */ + GST_LOG_OBJECT (udpsrc, "stop"); + /* stop this function */ + stop = TRUE; + break; + default: + GST_WARNING_OBJECT (udpsrc, "unkown"); + g_warning ("multiudpsink: unknown control message received"); + break; + } + } + } + } + if (stop) + goto stopped; + } while (try_again); + + /* ask how much is available for reading on the socket */ + if ((ret = ioctl (udpsrc->sock, FIONREAD, &readsize)) < 0) + goto ioctl_failed; + + GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", readsize); + + pktdata = g_malloc (readsize); + pktsize = readsize; len = sizeof (struct sockaddr); while (TRUE) { - numbytes = recvfrom (udpsrc->sock, pktdata, pktsize, + ret = recvfrom (udpsrc->sock, pktdata, pktsize, 0, (struct sockaddr *) &tmpaddr, &len); - if (numbytes < 0) { + if (ret < 0) { if (errno != EAGAIN && errno != EINTR) goto receive_error; } else @@ -204,7 +288,7 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) outbuf = gst_buffer_new (); GST_BUFFER_DATA (outbuf) = pktdata; - GST_BUFFER_SIZE (outbuf) = numbytes; + GST_BUFFER_SIZE (outbuf) = ret; *buf = outbuf; @@ -212,13 +296,26 @@ gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf) select_error: { - GST_DEBUG ("got select error"); + GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), + ("select error %d: %s (%d)", ret, g_strerror (errno), errno)); + return GST_FLOW_ERROR; + } +stopped: + { + GST_DEBUG ("stop called"); + return GST_FLOW_WRONG_STATE; + } +ioctl_failed: + { + GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), + ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno)); return GST_FLOW_ERROR; } receive_error: { - gst_buffer_unref (outbuf); - GST_DEBUG ("got receive error"); + g_free (pktdata); + GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL), + ("receive error %d: %s (%d)", ret, g_strerror (errno), errno)); return GST_FLOW_ERROR; } } @@ -250,7 +347,8 @@ gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri) wrong_protocol: { g_free (protocol); - GST_DEBUG ("error parsing uri %s", uri); + GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), + ("error parsing uri %s: wrong protocol", uri)); return FALSE; } } @@ -313,39 +411,53 @@ static gboolean gst_udpsrc_start (GstBaseSrc * bsrc) { guint bc_val; - gint reuse = 1; + gint reuse; struct sockaddr_in my_addr; int len, port; GstUDPSrc *src; + gint ret; src = GST_UDPSRC (bsrc); - memset (&src->myaddr, 0, sizeof (src->myaddr)); - src->myaddr.sin_family = AF_INET; /* host byte order */ - src->myaddr.sin_port = htons (src->port); /* short, network byte order */ - src->myaddr.sin_addr.s_addr = INADDR_ANY; + if ((ret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src))) < 0) + goto no_socket_pair; - if ((src->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0) + fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); + fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); + + if ((ret = socket (AF_INET, SOCK_DGRAM, 0)) < 0) goto no_socket; - if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse, - sizeof (reuse)) < 0) + src->sock = ret; + + reuse = 1; + if ((ret = + setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse, + sizeof (reuse))) < 0) goto setsockopt_error; - if (bind (src->sock, (struct sockaddr *) &src->myaddr, - sizeof (src->myaddr)) < 0) + memset (&src->myaddr, 0, sizeof (src->myaddr)); + src->myaddr.sin_family = AF_INET; /* host byte order */ + src->myaddr.sin_port = htons (src->port); /* short, network byte order */ + src->myaddr.sin_addr.s_addr = INADDR_ANY; + + if ((ret = + bind (src->sock, (struct sockaddr *) &src->myaddr, + sizeof (src->myaddr))) < 0) goto bind_error; if (inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) { if (src->multi_addr.imr_multiaddr.s_addr) { src->multi_addr.imr_interface.s_addr = INADDR_ANY; - setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &src->multi_addr, - sizeof (src->multi_addr)); + if ((ret = + setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &src->multi_addr, sizeof (src->multi_addr))) < 0) + goto membership; } } len = sizeof (my_addr); - if (getsockname (src->sock, (struct sockaddr *) &my_addr, &len) < 0) + if ((ret = getsockname (src->sock, (struct sockaddr *) &my_addr, &len)) < 0) goto getsockname_error; port = ntohs (my_addr.sin_port); @@ -355,35 +467,85 @@ gst_udpsrc_start (GstBaseSrc * bsrc) } bc_val = 1; - setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val)); + if ((ret = + setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, + sizeof (bc_val))) < 0) + goto no_broadcast; + src->myaddr.sin_port = htons (src->port + 1); return TRUE; /* ERRORS */ +no_socket_pair: + { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + ("no socket pair %d: %s (%d)", ret, g_strerror (errno), errno)); + return FALSE; + } no_socket: { - GST_DEBUG ("no_socket"); + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), + ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno)); return FALSE; } setsockopt_error: { - GST_DEBUG ("setsockopt failed"); + close (src->sock); + src->sock = -1; + GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), + ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno)); return FALSE; } bind_error: { - GST_DEBUG ("bind failed"); + close (src->sock); + src->sock = -1; + GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), + ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno)); + return FALSE; + } +membership: + { + close (src->sock); + src->sock = -1; + GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), + ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno)); return FALSE; } getsockname_error: { - GST_DEBUG ("getsockname failed"); + close (src->sock); + src->sock = -1; + GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), + ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno)); + return FALSE; + } +no_broadcast: + { + close (src->sock); + src->sock = -1; + GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), + ("could not configure socket for broadcast %d: %s (%d)", ret, + g_strerror (errno), errno)); return FALSE; } } static gboolean +gst_udpsrc_unlock (GstBaseSrc * bsrc) +{ + GstUDPSrc *src; + + src = GST_UDPSRC (bsrc); + + GST_DEBUG ("sending stop command"); + SEND_COMMAND (src, CONTROL_STOP); + + return TRUE; +} + +static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc) { GstUDPSrc *src; |