summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWim Taymans <wim.taymans@gmail.com>2005-05-12 10:45:25 +0000
committerWim Taymans <wim.taymans@gmail.com>2005-05-12 10:45:25 +0000
commit943f0445a965c319dcf31cc1e1839212aa479de4 (patch)
tree5b58509768b2bf74d1d172c6032ecfe2ad3a3ce3
parent99283f6e38c9faf77c41829a9d61dae8ca304f4d (diff)
gst/: Make UDP and TCP elements use PushSrc.
Original commit message from CVS: * gst/rtsp/README: * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type), (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init), (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps), (gst_tcpclientsrc_stop), (gst_tcpclientsrc_eos), (gst_tcpclientsrc_create), (gst_tcpclientsrc_start): * gst/tcp/gsttcpclientsrc.h: * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type), (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init), (gst_tcpserversrc_init), (gst_tcpserversrc_create), (gst_tcpserversrc_start), (gst_tcpserversrc_stop): * gst/tcp/gsttcpserversrc.h: * gst/tcp/gsttcpsrc.c: (gst_tcpsrc_get_type), (gst_tcpsrc_base_init), (gst_tcpsrc_class_init), (gst_tcpsrc_init), (gst_tcpsrc_create), (gst_tcpsrc_start), (gst_tcpsrc_stop): * gst/tcp/gsttcpsrc.h: * gst/udp/gstudpsink.c: (gst_udpsink_base_init), (gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render), (gst_udpsink_set_property), (gst_udpsink_get_property), (gst_udpsink_change_state): * gst/udp/gstudpsink.h: * gst/udp/gstudpsrc.c: (gst_udpsrc_get_type), (gst_udpsrc_base_init), (gst_udpsrc_class_init), (gst_udpsrc_init), (gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start), (gst_udpsrc_stop): * gst/udp/gstudpsrc.h: Make UDP and TCP elements use PushSrc.
-rw-r--r--ChangeLog31
-rw-r--r--gst/rtsp/README2
-rw-r--r--gst/udp/gstudpsink.c41
-rw-r--r--gst/udp/gstudpsink.h1
-rw-r--r--gst/udp/gstudpsrc.c195
-rw-r--r--gst/udp/gstudpsrc.h19
6 files changed, 121 insertions, 168 deletions
diff --git a/ChangeLog b/ChangeLog
index 9589d817..0be548b2 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,34 @@
+2005-05-12 Wim Taymans <wim@fluendo.com>
+
+ * gst/rtsp/README:
+ * gst/tcp/gsttcpclientsrc.c: (gst_tcpclientsrc_get_type),
+ (gst_tcpclientsrc_base_init), (gst_tcpclientsrc_class_init),
+ (gst_tcpclientsrc_init), (gst_tcpclientsrc_getcaps),
+ (gst_tcpclientsrc_stop), (gst_tcpclientsrc_eos),
+ (gst_tcpclientsrc_create), (gst_tcpclientsrc_start):
+ * gst/tcp/gsttcpclientsrc.h:
+ * gst/tcp/gsttcpserversrc.c: (gst_tcpserversrc_get_type),
+ (gst_tcpserversrc_base_init), (gst_tcpserversrc_class_init),
+ (gst_tcpserversrc_init), (gst_tcpserversrc_create),
+ (gst_tcpserversrc_start), (gst_tcpserversrc_stop):
+ * gst/tcp/gsttcpserversrc.h:
+ * gst/tcp/gsttcpsrc.c: (gst_tcpsrc_get_type),
+ (gst_tcpsrc_base_init), (gst_tcpsrc_class_init), (gst_tcpsrc_init),
+ (gst_tcpsrc_create), (gst_tcpsrc_start), (gst_tcpsrc_stop):
+ * gst/tcp/gsttcpsrc.h:
+ * gst/udp/gstudpsink.c: (gst_udpsink_base_init),
+ (gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render),
+ (gst_udpsink_set_property), (gst_udpsink_get_property),
+ (gst_udpsink_change_state):
+ * gst/udp/gstudpsink.h:
+ * gst/udp/gstudpsrc.c: (gst_udpsrc_get_type),
+ (gst_udpsrc_base_init), (gst_udpsrc_class_init), (gst_udpsrc_init),
+ (gst_udpsrc_create), (gst_udpsrc_set_uri), (gst_udpsrc_start),
+ (gst_udpsrc_stop):
+ * gst/udp/gstudpsrc.h:
+ Make UDP and TCP elements use PushSrc.
+
+
2005-05-11 Tim-Philipp Müller <tim at centricular dot net>
* ext/mad/gstmad.c: (gst_mad_init), (gst_mad_src_query),
diff --git a/gst/rtsp/README b/gst/rtsp/README
index 62793a65..285f4f43 100644
--- a/gst/rtsp/README
+++ b/gst/rtsp/README
@@ -137,7 +137,7 @@ An RTSP session is created as follows:
+---------------------------------------------+
| +------------+ |
| | _loop() | +--------+ |
- | | ----- rtpdec --------------------
+ | | ----- rtpses --------------------
| | | | | |
| | | | | +------------+ |
| | ----- RTCP ---- udpsink | |
diff --git a/gst/udp/gstudpsink.c b/gst/udp/gstudpsink.c
index 93fb654b..0d57cd66 100644
--- a/gst/udp/gstudpsink.c
+++ b/gst/udp/gstudpsink.c
@@ -50,8 +50,7 @@ enum
ARG_0,
ARG_HOST,
ARG_PORT,
- ARG_MTU
- /* FILL ME */
+ /* FILL ME */
};
static void gst_udpsink_base_init (gpointer g_class);
@@ -146,7 +145,6 @@ gst_udpsink_init (GstUDPSink * udpsink)
{
udpsink->host = g_strdup (UDP_DEFAULT_HOST);
udpsink->port = UDP_DEFAULT_PORT;
- udpsink->mtu = 1024;
}
static void
@@ -161,30 +159,31 @@ static GstFlowReturn
gst_udpsink_render (GstBaseSink * sink, GstBuffer * buffer)
{
GstUDPSink *udpsink;
- gint tosend;
+ gint ret, size;
guint8 *data;
udpsink = GST_UDPSINK (sink);
- tosend = GST_BUFFER_SIZE (buffer);
+ size = GST_BUFFER_SIZE (buffer);
data = GST_BUFFER_DATA (buffer);
- /* send in chunks of MTU */
- while (tosend > 0) {
- gint psize;
-
- psize = MIN (udpsink->mtu, tosend);
- if (sendto (udpsink->sock, data, psize, 0,
- (struct sockaddr *) &udpsink->theiraddr,
- sizeof (udpsink->theiraddr)) == -1) {
- perror ("sending");
- }
+ while (TRUE) {
+ ret = sendto (udpsink->sock, data, size, 0,
+ (struct sockaddr *) &udpsink->theiraddr, sizeof (udpsink->theiraddr));
- data += psize;
- tosend -= psize;
+ if (ret < 0) {
+ if (errno != EINTR && errno != EAGAIN)
+ goto send_error;
+ } else
+ break;
}
-
return GST_FLOW_OK;
+
+send_error:
+ {
+ GST_DEBUG ("got send error");
+ return GST_FLOW_ERROR;
+ }
}
static void
@@ -207,9 +206,6 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value,
case ARG_PORT:
udpsink->port = g_value_get_int (value);
break;
- case ARG_MTU:
- udpsink->mtu = g_value_get_int (value);
- break;
default:
break;
}
@@ -230,9 +226,6 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value,
case ARG_PORT:
g_value_set_int (value, udpsink->port);
break;
- case ARG_MTU:
- g_value_set_int (value, udpsink->mtu);
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
diff --git a/gst/udp/gstudpsink.h b/gst/udp/gstudpsink.h
index 0f65a254..e5480c34 100644
--- a/gst/udp/gstudpsink.h
+++ b/gst/udp/gstudpsink.h
@@ -59,7 +59,6 @@ struct _GstUDPSink {
gint port;
gchar *host;
- guint mtu;
};
struct _GstUDPSinkClass {
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c
index 120d0c55..76c4b73a 100644
--- a/gst/udp/gstudpsrc.c
+++ b/gst/udp/gstudpsrc.c
@@ -63,9 +63,9 @@ static void gst_udpsrc_init (GstUDPSrc * udpsrc);
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);
-static void gst_udpsrc_loop (GstPad * pad);
-static GstElementStateReturn gst_udpsrc_change_state (GstElement * element);
-static gboolean gst_udpsrc_activate (GstPad * pad, GstActivateMode mode);
+static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
+static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
+static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
static void gst_udpsrc_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
@@ -101,7 +101,7 @@ gst_udpsrc_get_type (void)
};
udpsrc_type =
- g_type_register_static (GST_TYPE_ELEMENT, "GstUDPSrc", &udpsrc_info, 0);
+ g_type_register_static (GST_TYPE_PUSHSRC, "GstUDPSrc", &udpsrc_info, 0);
g_type_add_interface_static (udpsrc_type, GST_TYPE_URI_HANDLER,
&urihandler_info);
@@ -125,11 +125,15 @@ gst_udpsrc_class_init (GstUDPSrc * klass)
{
GObjectClass *gobject_class;
GstElementClass *gstelement_class;
+ GstBaseSrcClass *gstbasesrc_class;
+ GstPushSrcClass *gstpushsrc_class;
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
+ gstbasesrc_class = (GstBaseSrcClass *) klass;
+ gstpushsrc_class = (GstPushSrcClass *) klass;
- parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
+ parent_class = g_type_class_ref (GST_TYPE_PUSHSRC);
gobject_class->set_property = gst_udpsrc_set_property;
gobject_class->get_property = gst_udpsrc_get_property;
@@ -147,27 +151,22 @@ gst_udpsrc_class_init (GstUDPSrc * klass)
"URI in the form of udp://hostname:port", UDP_DEFAULT_URI,
G_PARAM_READWRITE));
- gstelement_class->change_state = gst_udpsrc_change_state;
+ gstbasesrc_class->start = gst_udpsrc_start;
+ gstbasesrc_class->stop = gst_udpsrc_stop;
+ gstpushsrc_class->create = gst_udpsrc_create;
}
static void
gst_udpsrc_init (GstUDPSrc * udpsrc)
{
- /* create the src and src pads */
- udpsrc->srcpad = gst_pad_new_from_template
- (gst_static_pad_template_get (&src_template), "src");
- gst_pad_set_activate_function (udpsrc->srcpad, gst_udpsrc_activate);
- gst_pad_set_loop_function (udpsrc->srcpad, gst_udpsrc_loop);
- gst_element_add_pad (GST_ELEMENT (udpsrc), udpsrc->srcpad);
-
udpsrc->port = UDP_DEFAULT_PORT;
udpsrc->sock = -1;
udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
udpsrc->uri = g_strdup (UDP_DEFAULT_URI);
}
-static void
-gst_udpsrc_loop (GstPad * pad)
+static GstFlowReturn
+gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
{
GstUDPSrc *udpsrc;
GstBuffer *outbuf;
@@ -176,55 +175,51 @@ gst_udpsrc_loop (GstPad * pad)
gint numbytes;
fd_set read_fds;
guint max_sock;
+ gchar *pktdata;
+ gint pktsize;
- udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad));
+ udpsrc = GST_UDPSRC (psrc);
FD_ZERO (&read_fds);
FD_SET (udpsrc->sock, &read_fds);
max_sock = udpsrc->sock;
- GST_STREAM_LOCK (pad);
-
/* FIXME, add another socket to unblock */
if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0)
goto select_error;
- outbuf = gst_buffer_new ();
- GST_BUFFER_DATA (outbuf) = g_malloc (24000);
- GST_BUFFER_SIZE (outbuf) = 24000;
+ pktdata = g_malloc (24000);
+ pktsize = 24000;
len = sizeof (struct sockaddr);
- if ((numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf),
- GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *) &tmpaddr,
- &len)) == -1)
- goto receive_error;
+ while (TRUE) {
+ numbytes = recvfrom (udpsrc->sock, pktdata, pktsize,
+ 0, (struct sockaddr *) &tmpaddr, &len);
+ if (numbytes < 0) {
+ if (errno != EAGAIN && errno != EINTR)
+ goto receive_error;
+ } else
+ break;
+ }
+ outbuf = gst_buffer_new ();
+ GST_BUFFER_DATA (outbuf) = pktdata;
GST_BUFFER_SIZE (outbuf) = numbytes;
- if (gst_pad_push (udpsrc->srcpad, outbuf) != GST_FLOW_OK)
- goto need_pause;
- GST_STREAM_UNLOCK (pad);
+ *buf = outbuf;
- return;
+ return GST_FLOW_OK;
select_error:
{
- GST_STREAM_UNLOCK (pad);
GST_DEBUG ("got select error");
- return;
+ return GST_FLOW_ERROR;
}
receive_error:
{
- GST_STREAM_UNLOCK (pad);
gst_buffer_unref (outbuf);
GST_DEBUG ("got receive error");
- return;
- }
-need_pause:
- {
- gst_task_pause (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- return;
+ return GST_FLOW_ERROR;
}
}
@@ -315,12 +310,15 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
/* create a socket for sending to remote machine */
static gboolean
-gst_udpsrc_init_receive (GstUDPSrc * src)
+gst_udpsrc_start (GstBaseSrc * bsrc)
{
guint bc_val;
gint reuse = 1;
struct sockaddr_in my_addr;
int len, port;
+ GstUDPSrc *src;
+
+ src = GST_UDPSRC (bsrc);
memset (&src->myaddr, 0, sizeof (src->myaddr));
src->myaddr.sin_family = AF_INET; /* host byte order */
@@ -328,15 +326,15 @@ gst_udpsrc_init_receive (GstUDPSrc * src)
src->myaddr.sin_addr.s_addr = INADDR_ANY;
if ((src->sock = socket (AF_INET, SOCK_DGRAM, 0)) < 0)
- goto error;
+ goto no_socket;
if (setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
sizeof (reuse)) < 0)
- goto error;
+ goto setsockopt_error;
if (bind (src->sock, (struct sockaddr *) &src->myaddr,
sizeof (src->myaddr)) < 0)
- goto error;
+ goto bind_error;
if (inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) {
if (src->multi_addr.imr_multiaddr.s_addr) {
@@ -347,7 +345,9 @@ gst_udpsrc_init_receive (GstUDPSrc * src)
}
len = sizeof (my_addr);
- getsockname (src->sock, (struct sockaddr *) &my_addr, &len);
+ if (getsockname (src->sock, (struct sockaddr *) &my_addr, &len) < 0)
+ goto getsockname_error;
+
port = ntohs (my_addr.sin_port);
if (port != src->port) {
src->port = port;
@@ -360,102 +360,41 @@ gst_udpsrc_init_receive (GstUDPSrc * src)
return TRUE;
-error:
+ /* ERRORS */
+no_socket:
{
- perror ("open");
+ GST_DEBUG ("no_socket");
return FALSE;
}
-}
-
-static void
-gst_udpsrc_close (GstUDPSrc * src)
-{
- if (src->sock != -1) {
- close (src->sock);
- src->sock = -1;
+setsockopt_error:
+ {
+ GST_DEBUG ("setsockopt failed");
+ return FALSE;
}
-}
-
-static gboolean
-gst_udpsrc_activate (GstPad * pad, GstActivateMode mode)
-{
- gboolean result;
- GstUDPSrc *udpsrc;
-
- udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad));
-
- switch (mode) {
- case GST_ACTIVATE_PUSH:
- /* if we have a scheduler we can start the task */
- if (GST_ELEMENT_SCHEDULER (udpsrc)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (udpsrc),
- (GstTaskFunction) gst_udpsrc_loop, pad);
-
- gst_task_start (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
- break;
- case GST_ACTIVATE_PULL:
- result = FALSE;
- break;
- case GST_ACTIVATE_NONE:
- /* step 1, unblock clock sync (if any) */
-
- /* step 2, make sure streaming finishes */
- GST_STREAM_LOCK (pad);
- gst_udpsrc_close (udpsrc);
-
- /* step 3, stop the task */
- if (GST_RPAD_TASK (pad)) {
- gst_task_stop (GST_RPAD_TASK (pad));
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_RPAD_TASK (pad) = NULL;
- }
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
- break;
+bind_error:
+ {
+ GST_DEBUG ("bind failed");
+ return FALSE;
+ }
+getsockname_error:
+ {
+ GST_DEBUG ("getsockname failed");
+ return FALSE;
}
- return result;
}
-static GstElementStateReturn
-gst_udpsrc_change_state (GstElement * element)
+static gboolean
+gst_udpsrc_stop (GstBaseSrc * bsrc)
{
- GstElementStateReturn ret;
GstUDPSrc *src;
- gint transition;
- src = GST_UDPSRC (element);
+ src = GST_UDPSRC (bsrc);
- transition = GST_STATE_TRANSITION (element);
-
- switch (transition) {
- case GST_STATE_READY_TO_PAUSED:
- if (!gst_udpsrc_init_receive (src))
- goto no_init;
- break;
- default:
- break;
- }
-
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
-
- switch (transition) {
- default:
- break;
- }
-
- return ret;
-
-no_init:
- {
- GST_DEBUG ("could not init udp socket");
- return GST_STATE_FAILURE;
+ if (src->sock != -1) {
+ close (src->sock);
+ src->sock = -1;
}
+ return TRUE;
}
/*** GSTURIHANDLER INTERFACE *************************************************/
diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h
index a347eb2a..c39cbe71 100644
--- a/gst/udp/gstudpsrc.h
+++ b/gst/udp/gstudpsrc.h
@@ -22,10 +22,9 @@
#define __GST_UDPSRC_H__
#include <gst/gst.h>
+#include <gst/base/gstpushsrc.h>
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
+G_BEGIN_DECLS
#include <errno.h>
#include <string.h>
@@ -53,14 +52,9 @@ typedef struct _GstUDPSrc GstUDPSrc;
typedef struct _GstUDPSrcClass GstUDPSrcClass;
struct _GstUDPSrc {
- GstElement element;
-
- /* pads */
- GstPad *sinkpad,
- *srcpad;
+ GstPushSrc parent;
gchar *uri;
-
int port;
int sock;
gchar *multi_group;
@@ -74,15 +68,12 @@ struct _GstUDPSrc {
};
struct _GstUDPSrcClass {
- GstElementClass parent_class;
+ GstPushSrcClass parent_class;
};
GType gst_udpsrc_get_type(void);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
+G_END_DECLS
#endif /* __GST_UDPSRC_H__ */