diff options
author | Wim Taymans <wim.taymans@gmail.com> | 2005-05-10 11:15:13 +0000 |
---|---|---|
committer | Wim Taymans <wim.taymans@gmail.com> | 2005-05-10 11:15:13 +0000 |
commit | 4eb7e48ab13d029f96177d92aa72edda02527962 (patch) | |
tree | 61b13fdf85f35493d72642509ee4013bf6cbe0b4 | |
parent | 1987170ab6dc7bc07553f766bda3393021246e14 (diff) |
gst/udp/: Ported udp src/sink.
Original commit message from CVS:
* gst/udp/Makefile.am:
* gst/udp/gstudpsink.c: (gst_udpsink_get_type),
(gst_udpsink_base_init), (gst_udpsink_class_init),
(gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render),
(gst_udpsink_set_property), (gst_udpsink_get_property),
(gst_udpsink_init_send), (gst_udpsink_close),
(gst_udpsink_change_state):
* gst/udp/gstudpsink.h:
* gst/udp/gstudpsrc.c: (gst_udpsrc_base_init),
(gst_udpsrc_class_init), (gst_udpsrc_init), (gst_udpsrc_loop),
(gst_udpsrc_set_property), (gst_udpsrc_get_property),
(gst_udpsrc_init_receive), (gst_udpsrc_close),
(gst_udpsrc_activate), (gst_udpsrc_change_state):
* gst/udp/gstudpsrc.h:
Ported udp src/sink.
-rw-r--r-- | ChangeLog | 18 | ||||
-rw-r--r-- | gst/udp/Makefile.am | 2 | ||||
-rw-r--r-- | gst/udp/gstudpsink.c | 274 | ||||
-rw-r--r-- | gst/udp/gstudpsink.h | 45 | ||||
-rw-r--r-- | gst/udp/gstudpsrc.c | 360 | ||||
-rw-r--r-- | gst/udp/gstudpsrc.h | 12 |
6 files changed, 218 insertions, 493 deletions
@@ -1,3 +1,21 @@ +2005-05-10 Wim Taymans <wim@fluendo.com> + + * gst/udp/Makefile.am: + * gst/udp/gstudpsink.c: (gst_udpsink_get_type), + (gst_udpsink_base_init), (gst_udpsink_class_init), + (gst_udpsink_init), (gst_udpsink_get_times), (gst_udpsink_render), + (gst_udpsink_set_property), (gst_udpsink_get_property), + (gst_udpsink_init_send), (gst_udpsink_close), + (gst_udpsink_change_state): + * gst/udp/gstudpsink.h: + * gst/udp/gstudpsrc.c: (gst_udpsrc_base_init), + (gst_udpsrc_class_init), (gst_udpsrc_init), (gst_udpsrc_loop), + (gst_udpsrc_set_property), (gst_udpsrc_get_property), + (gst_udpsrc_init_receive), (gst_udpsrc_close), + (gst_udpsrc_activate), (gst_udpsrc_change_state): + * gst/udp/gstudpsrc.h: + Ported udp src/sink. + 2005-05-09 Zaheer Abbas Merali <zaheerabbas at merali dot org> * PORTED_09: diff --git a/gst/udp/Makefile.am b/gst/udp/Makefile.am index 3871d714..b888c067 100644 --- a/gst/udp/Makefile.am +++ b/gst/udp/Makefile.am @@ -4,7 +4,7 @@ plugin_LTLIBRARIES = libgstudp.la libgstudp_la_SOURCES = gstudp.c gstudpsrc.c gstudpsink.c libgstudp_la_CFLAGS = $(GST_CFLAGS) libgstudp_la_LIBADD = -libgstudp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) +libgstudp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) $(GST_BASE_LIBS) noinst_HEADERS = gstudpsink.h gstudpsrc.h gstudp.h diff --git a/gst/udp/gstudpsink.c b/gst/udp/gstudpsink.c index 7de10b0f..7f9206f9 100644 --- a/gst/udp/gstudpsink.c +++ b/gst/udp/gstudpsink.c @@ -25,7 +25,11 @@ #define UDP_DEFAULT_HOST "localhost" #define UDP_DEFAULT_PORT 4951 -#define UDP_DEFAULT_CONTROL 1 + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); /* elementfactory information */ static GstElementDetails gst_udpsink_details = @@ -37,7 +41,6 @@ GST_ELEMENT_DETAILS ("UDP packet sender", /* UDPSink signals and args */ enum { - FRAME_ENCODED, /* FILL ME */ LAST_SIGNAL }; @@ -47,37 +50,18 @@ enum ARG_0, ARG_HOST, ARG_PORT, - ARG_CONTROL, ARG_MTU /* FILL ME */ }; -#define GST_TYPE_UDPSINK_CONTROL (gst_udpsink_control_get_type()) -static GType -gst_udpsink_control_get_type (void) -{ - static GType udpsink_control_type = 0; - static GEnumValue udpsink_control[] = { - {CONTROL_NONE, "1", "none"}, - {CONTROL_UDP, "2", "udp"}, - {CONTROL_TCP, "3", "tcp"}, - {CONTROL_ZERO, NULL, NULL}, - }; - - if (!udpsink_control_type) { - udpsink_control_type = - g_enum_register_static ("GstUDPSinkControl", udpsink_control); - } - return udpsink_control_type; -} - static void gst_udpsink_base_init (gpointer g_class); static void gst_udpsink_class_init (GstUDPSink * klass); static void gst_udpsink_init (GstUDPSink * udpsink); -static void gst_udpsink_set_clock (GstElement * element, GstClock * clock); - -static void gst_udpsink_chain (GstPad * pad, GstData * _data); +static void gst_udpsink_get_times (GstBaseSink * sink, GstBuffer * buffer, + GstClockTime * start, GstClockTime * end); +static GstFlowReturn gst_udpsink_render (GstBaseSink * sink, + GstBuffer * buffer); static GstElementStateReturn gst_udpsink_change_state (GstElement * element); static void gst_udpsink_set_property (GObject * object, guint prop_id, @@ -85,7 +69,6 @@ static void gst_udpsink_set_property (GObject * object, guint prop_id, static void gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); - static GstElementClass *parent_class = NULL; /*static guint gst_udpsink_signals[LAST_SIGNAL] = { 0 }; */ @@ -110,7 +93,7 @@ gst_udpsink_get_type (void) }; udpsink_type = - g_type_register_static (GST_TYPE_ELEMENT, "GstUDPSink", &udpsink_info, + g_type_register_static (GST_TYPE_BASESINK, "GstUDPSink", &udpsink_info, 0); } return udpsink_type; @@ -121,6 +104,9 @@ gst_udpsink_base_init (gpointer g_class) { GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&sink_template)); + gst_element_class_set_details (element_class, &gst_udpsink_details); } @@ -129,11 +115,16 @@ gst_udpsink_class_init (GstUDPSink * klass) { GObjectClass *gobject_class; GstElementClass *gstelement_class; + GstBaseSinkClass *gstbasesink_class; gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; + gstbasesink_class = (GstBaseSinkClass *) klass; - parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + parent_class = g_type_class_ref (GST_TYPE_BASESINK); + + gobject_class->set_property = gst_udpsink_set_property; + gobject_class->get_property = gst_udpsink_get_property; g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_HOST, g_param_spec_string ("host", "host", @@ -142,176 +133,57 @@ gst_udpsink_class_init (GstUDPSink * klass) g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, g_param_spec_int ("port", "port", "The port to send the packets to", 0, 32768, UDP_DEFAULT_PORT, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_CONTROL, - g_param_spec_enum ("control", "control", "The type of control", - GST_TYPE_UDPSINK_CONTROL, CONTROL_UDP, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_MTU, g_param_spec_int ("mtu", "mtu", "maximun transmit unit", G_MININT, G_MAXINT, 0, G_PARAM_READWRITE)); /* CHECKME */ - - gobject_class->set_property = gst_udpsink_set_property; - gobject_class->get_property = gst_udpsink_get_property; gstelement_class->change_state = gst_udpsink_change_state; - gstelement_class->set_clock = gst_udpsink_set_clock; -} - -static GstPadLinkReturn -gst_udpsink_sink_link (GstPad * pad, const GstCaps * caps) -{ -#ifndef GST_DISABLE_LOADSAVE - GstUDPSink *udpsink; - struct sockaddr_in serv_addr; - struct hostent *serverhost; - int fd; - FILE *f; - guint bc_val; - - xmlDocPtr doc; - xmlChar *buf; - int buf_size; - - udpsink = GST_UDPSINK (gst_pad_get_parent (pad)); - - memset (&serv_addr, 0, sizeof (serv_addr)); - - /* its a name rather than an ipnum */ - serverhost = gethostbyname (udpsink->host); - if (serverhost == (struct hostent *) 0) { - perror ("gethostbyname"); - return GST_PAD_LINK_REFUSED; - } - - memmove (&serv_addr.sin_addr, serverhost->h_addr, serverhost->h_length); - - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons (udpsink->port + 1); - - doc = xmlNewDoc ("1.0"); - doc->xmlRootNode = xmlNewDocNode (doc, NULL, "NewCaps", NULL); - - gst_caps_save_thyself (caps, doc->xmlRootNode); - - switch (udpsink->control) { - case CONTROL_UDP: - if ((fd = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) { - perror ("socket"); - return GST_PAD_LINK_REFUSED; - } - - /* We can only do broadcast in udp */ - bc_val = 1; - setsockopt (fd, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val)); - - xmlDocDumpMemory (doc, &buf, &buf_size); - - if (sendto (fd, buf, buf_size, 0, (struct sockaddr *) &serv_addr, - sizeof (serv_addr)) == -1) { - perror ("sending"); - return GST_PAD_LINK_REFUSED; - } - close (fd); - break; - case CONTROL_TCP: - if ((fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) == -1) { - perror ("socket"); - return GST_PAD_LINK_REFUSED; - } - - if (connect (fd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) != 0) { - g_printerr ("udpsink: connect to %s port %d failed: %s\n", - udpsink->host, udpsink->port, g_strerror (errno)); - return GST_PAD_LINK_REFUSED; - } - - f = fdopen (dup (fd), "wb"); - - xmlDocDump (f, doc); - fclose (f); - close (fd); - break; - case CONTROL_NONE: - return GST_PAD_LINK_OK; - break; - default: - return GST_PAD_LINK_REFUSED; - break; - } -#endif - - return GST_PAD_LINK_OK; + gstbasesink_class->get_times = gst_udpsink_get_times; + gstbasesink_class->render = gst_udpsink_render; } -static void -gst_udpsink_set_clock (GstElement * element, GstClock * clock) -{ - GstUDPSink *udpsink; - - udpsink = GST_UDPSINK (element); - - udpsink->clock = clock; -} static void gst_udpsink_init (GstUDPSink * udpsink) { - /* create the sink and src pads */ - udpsink->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); - gst_element_add_pad (GST_ELEMENT (udpsink), udpsink->sinkpad); - gst_pad_set_chain_function (udpsink->sinkpad, gst_udpsink_chain); - gst_pad_set_link_function (udpsink->sinkpad, gst_udpsink_sink_link); - udpsink->host = g_strdup (UDP_DEFAULT_HOST); udpsink->port = UDP_DEFAULT_PORT; - udpsink->control = CONTROL_UDP; udpsink->mtu = 1024; - - udpsink->clock = NULL; } static void -gst_udpsink_chain (GstPad * pad, GstData * _data) +gst_udpsink_get_times (GstBaseSink * sink, GstBuffer * buffer, + GstClockTime * start, GstClockTime * end) +{ + *start = GST_BUFFER_TIMESTAMP (buffer); + *end = *start + GST_BUFFER_DURATION (buffer); +} + +static GstFlowReturn +gst_udpsink_render (GstBaseSink * sink, GstBuffer * buffer) { - GstBuffer *buf = GST_BUFFER (_data); GstUDPSink *udpsink; guint tolen, i; + gint tosend; + guint8 *data; - g_return_if_fail (pad != NULL); - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (buf != NULL); - - udpsink = GST_UDPSINK (GST_OBJECT_PARENT (pad)); - - if (udpsink->clock && GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { - gst_element_wait (GST_ELEMENT (udpsink), GST_BUFFER_TIMESTAMP (buf)); - } + udpsink = GST_UDPSINK (sink); tolen = sizeof (udpsink->theiraddr); - /* - if (sendto (udpsink->sock, GST_BUFFER_DATA (buf), - GST_BUFFER_SIZE (buf), 0, (struct sockaddr *) &udpsink->theiraddr, - tolen) == -1) { - perror("sending"); - } - */ - /* MTU */ - for (i = 0; i < GST_BUFFER_SIZE (buf); i += udpsink->mtu) { - if (GST_BUFFER_SIZE (buf) - i > udpsink->mtu) { - if (sendto (udpsink->sock, GST_BUFFER_DATA (buf) + i, - udpsink->mtu, 0, (struct sockaddr *) &udpsink->theiraddr, - tolen) == -1) { - perror ("sending"); - } - } else { - if (sendto (udpsink->sock, GST_BUFFER_DATA (buf) + i, - GST_BUFFER_SIZE (buf) - i, 0, - (struct sockaddr *) &udpsink->theiraddr, tolen) == -1) { - perror ("sending"); - } + tosend = GST_BUFFER_SIZE (buffer); + data = GST_BUFFER_DATA (buffer); + + /* send in chunks of MTU */ + while (tosend > 0) { + if (sendto (udpsink->sock, data, udpsink->mtu, + 0, (struct sockaddr *) &udpsink->theiraddr, tolen) == -1) { + perror ("sending"); } + + data += udpsink->mtu; + tosend -= udpsink->mtu; } - gst_buffer_unref (buf); + return GST_FLOW_OK; } static void @@ -320,8 +192,6 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value, { GstUDPSink *udpsink; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_UDPSINK (object)); udpsink = GST_UDPSINK (object); switch (prop_id) { @@ -336,9 +206,6 @@ gst_udpsink_set_property (GObject * object, guint prop_id, const GValue * value, case ARG_PORT: udpsink->port = g_value_get_int (value); break; - case ARG_CONTROL: - udpsink->control = g_value_get_enum (value); - break; case ARG_MTU: udpsink->mtu = g_value_get_int (value); break; @@ -353,8 +220,6 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value, { GstUDPSink *udpsink; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_UDPSINK (object)); udpsink = GST_UDPSINK (object); switch (prop_id) { @@ -364,9 +229,6 @@ gst_udpsink_get_property (GObject * object, guint prop_id, GValue * value, case ARG_PORT: g_value_set_int (value, udpsink->port); break; - case ARG_CONTROL: - g_value_set_enum (value, udpsink->control); - break; case ARG_MTU: g_value_set_int (value, udpsink->mtu); break; @@ -432,8 +294,6 @@ gst_udpsink_init_send (GstUDPSink * sink) bc_val = 1; setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val)); - GST_FLAG_SET (sink, GST_UDPSINK_OPEN); - return TRUE; } @@ -441,27 +301,41 @@ static void gst_udpsink_close (GstUDPSink * sink) { close (sink->sock); - - GST_FLAG_UNSET (sink, GST_UDPSINK_OPEN); } static GstElementStateReturn gst_udpsink_change_state (GstElement * element) { - g_return_val_if_fail (GST_IS_UDPSINK (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_UDPSINK_OPEN)) - gst_udpsink_close (GST_UDPSINK (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_UDPSINK_OPEN)) { - if (!gst_udpsink_init_send (GST_UDPSINK (element))) - return GST_STATE_FAILURE; - } + GstElementStateReturn ret; + GstUDPSink *sink; + gint transition; + + sink = GST_UDPSINK (element); + transition = GST_STATE_TRANSITION (element); + + switch (transition) { + case GST_STATE_READY_TO_PAUSED: + if (!gst_udpsink_init_send (sink)) + goto no_init; + break; + default: + break; } - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - return GST_STATE_SUCCESS; + switch (transition) { + case GST_STATE_PAUSED_TO_READY: + gst_udpsink_close (sink); + break; + default: + break; + } + + return ret; + +no_init: + { + return GST_STATE_FAILURE; + } } diff --git a/gst/udp/gstudpsink.h b/gst/udp/gstudpsink.h index f49b4efe..0f65a254 100644 --- a/gst/udp/gstudpsink.h +++ b/gst/udp/gstudpsink.h @@ -21,12 +21,10 @@ #ifndef __GST_UDPSINK_H__ #define __GST_UDPSINK_H__ - #include <gst/gst.h> +#include <gst/base/gstbasesink.h> -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_BEGIN_DECLS #include <stdio.h> #include <stdlib.h> @@ -43,56 +41,33 @@ extern "C" { #include <arpa/inet.h> #include "gstudp.h" -#define GST_TYPE_UDPSINK \ - (gst_udpsink_get_type()) -#define GST_UDPSINK(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_UDPSINK,GstUDPSink)) -#define GST_UDPSINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_UDPSINK,GstUDPSink)) -#define GST_IS_UDPSINK(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSINK)) -#define GST_IS_UDPSINK_CLASS(obj) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSINK)) +#define GST_TYPE_UDPSINK (gst_udpsink_get_type()) +#define GST_UDPSINK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_UDPSINK,GstUDPSink)) +#define GST_UDPSINK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_UDPSINK,GstUDPSink)) +#define GST_IS_UDPSINK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_UDPSINK)) +#define GST_IS_UDPSINK_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_UDPSINK)) typedef struct _GstUDPSink GstUDPSink; typedef struct _GstUDPSinkClass GstUDPSinkClass; -typedef enum { - GST_UDPSINK_OPEN = GST_ELEMENT_FLAG_LAST, - - GST_UDPSINK_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2, -} GstUDPSinkFlags; - struct _GstUDPSink { - GstElement element; - - /* pads */ - GstPad *sinkpad,*srcpad; + GstBaseSink parent; int sock; struct sockaddr_in theiraddr; struct ip_mreq multi_addr; gint port; - Gst_UDP_Control control; gchar *host; - guint mtu; - - GstClock *clock; }; struct _GstUDPSinkClass { - GstElementClass parent_class; - + GstBaseSinkClass parent_class; }; GType gst_udpsink_get_type(void); - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - +G_END_DECLS #endif /* __GST_UDPSINK_H__ */ diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index 801176c9..f6901a31 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -28,6 +28,11 @@ #define UDP_DEFAULT_PORT 4951 #define UDP_DEFAULT_MULTICAST_GROUP "0.0.0.0" +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + /* elementfactory information */ static GstElementDetails gst_udpsrc_details = GST_ELEMENT_DETAILS ("UDP packet receiver", @@ -46,42 +51,22 @@ enum { ARG_0, ARG_PORT, - ARG_CONTROL, ARG_MULTICAST_GROUP /* FILL ME */ }; -#define GST_TYPE_UDPSRC_CONTROL (gst_udpsrc_control_get_type()) -static GType -gst_udpsrc_control_get_type (void) -{ - static GType udpsrc_control_type = 0; - static GEnumValue udpsrc_control[] = { - {CONTROL_NONE, "1", "none"}, - {CONTROL_UDP, "2", "udp"}, - {CONTROL_TCP, "3", "tcp"}, - {CONTROL_ZERO, NULL, NULL}, - }; - - if (!udpsrc_control_type) { - udpsrc_control_type = - g_enum_register_static ("GstUDPSrcControl", udpsrc_control); - } - return udpsrc_control_type; -} - static void gst_udpsrc_base_init (gpointer g_class); static void gst_udpsrc_class_init (GstUDPSrc * klass); static void gst_udpsrc_init (GstUDPSrc * udpsrc); -static GstData *gst_udpsrc_get (GstPad * pad); +static void gst_udpsrc_loop (GstPad * pad); static GstElementStateReturn gst_udpsrc_change_state (GstElement * element); +static gboolean gst_udpsrc_activate (GstPad * pad, GstActivateMode mode); static void gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static void gst_udpsrc_set_clock (GstElement * element, GstClock * clock); static GstElementClass *parent_class = NULL; @@ -117,6 +102,9 @@ gst_udpsrc_base_init (gpointer g_class) { GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&src_template)); + gst_element_class_set_details (element_class, &gst_udpsrc_details); } @@ -131,55 +119,37 @@ gst_udpsrc_class_init (GstUDPSrc * klass) parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + gobject_class->set_property = gst_udpsrc_set_property; + gobject_class->get_property = gst_udpsrc_get_property; + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, g_param_spec_int ("port", "port", "The port to receive the packets from", 0, 32768, UDP_DEFAULT_PORT, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_CONTROL, - g_param_spec_enum ("control", "control", "The type of control", - GST_TYPE_UDPSRC_CONTROL, CONTROL_UDP, G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, ARG_MULTICAST_GROUP, g_param_spec_string ("multicast_group", "multicast_group", "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP, G_PARAM_READWRITE)); - gobject_class->set_property = gst_udpsrc_set_property; - gobject_class->get_property = gst_udpsrc_get_property; - gstelement_class->change_state = gst_udpsrc_change_state; - gstelement_class->set_clock = gst_udpsrc_set_clock; -} - -static void -gst_udpsrc_set_clock (GstElement * element, GstClock * clock) -{ - GstUDPSrc *udpsrc; - - udpsrc = GST_UDPSRC (element); - - udpsrc->clock = clock; } static void gst_udpsrc_init (GstUDPSrc * udpsrc) { /* create the src and src pads */ - udpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + udpsrc->srcpad = gst_pad_new_from_template + (gst_static_pad_template_get (&src_template), "src"); + gst_pad_set_activate_function (udpsrc->srcpad, gst_udpsrc_activate); + gst_pad_set_loop_function (udpsrc->srcpad, gst_udpsrc_loop); gst_element_add_pad (GST_ELEMENT (udpsrc), udpsrc->srcpad); - gst_pad_set_get_function (udpsrc->srcpad, gst_udpsrc_get); udpsrc->port = UDP_DEFAULT_PORT; - udpsrc->control = CONTROL_UDP; - udpsrc->clock = NULL; udpsrc->sock = -1; - udpsrc->control_sock = -1; udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP); - - udpsrc->first_buf = TRUE; - udpsrc->defer_data = NULL; } -static GstData * -gst_udpsrc_get (GstPad * pad) +static void +gst_udpsrc_loop (GstPad * pad) { GstUDPSrc *udpsrc; GstBuffer *outbuf; @@ -189,148 +159,49 @@ gst_udpsrc_get (GstPad * pad) fd_set read_fds; guint max_sock; - g_return_val_if_fail (pad != NULL, NULL); - g_return_val_if_fail (GST_IS_PAD (pad), NULL); - udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad)); - if (udpsrc->defer_data != NULL) { - GstData *outdata = udpsrc->defer_data; - - udpsrc->defer_data = NULL; - return outdata; - } - FD_ZERO (&read_fds); FD_SET (udpsrc->sock, &read_fds); - if (udpsrc->control != CONTROL_NONE) { - FD_SET (udpsrc->control_sock, &read_fds); - } - max_sock = MAX (udpsrc->sock, udpsrc->control_sock); - - if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) > 0) { - if ((udpsrc->control_sock != -1) && - FD_ISSET (udpsrc->control_sock, &read_fds)) { -#ifndef GST_DISABLE_LOADSAVE - guchar *buf; - int ret; - int fdread; - struct sockaddr addr; - xmlDocPtr doc; - GstCaps *caps; - - buf = g_malloc (1024 * 10); - - switch (udpsrc->control) { - case CONTROL_TCP: - len = sizeof (struct sockaddr); - fdread = accept (udpsrc->control_sock, &addr, &len); - if (fdread < 0) { - perror ("accept"); - } - - ret = read (fdread, buf, 1024 * 10); - break; - case CONTROL_UDP: - len = sizeof (struct sockaddr); - ret = - recvfrom (udpsrc->control_sock, buf, 1024 * 10, 0, - (struct sockaddr *) &tmpaddr, &len); - if (ret < 0) { - perror ("recvfrom"); - } - break; - case CONTROL_NONE: - default: - g_free (buf); - return NULL; - break; - } - - buf[ret] = '\0'; - doc = xmlParseMemory (buf, ret); - caps = gst_caps_load_thyself (doc->xmlRootNode); - if (caps == NULL) { - return NULL; - } - - /* foward the connect, we don't signal back the result here... */ - if (gst_caps_is_fixed (caps)) { - gst_pad_try_set_caps (udpsrc->srcpad, caps); - } else { - GST_ERROR ("caps %" GST_PTR_FORMAT, caps); - GST_ELEMENT_ERROR (udpsrc, CORE, NEGOTIATION, (NULL), - ("Got unfixed caps from peer")); - } - g_free (buf); -#endif - - outbuf = NULL; - } else { - outbuf = gst_buffer_new (); - GST_BUFFER_DATA (outbuf) = g_malloc (24000); - GST_BUFFER_SIZE (outbuf) = 24000; + max_sock = udpsrc->sock; - if (udpsrc->first_buf) { - if (udpsrc->clock) { - GstClockTime current_time; - GstEvent *discont; + if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0) + goto select_error; - current_time = gst_clock_get_time (udpsrc->clock); + outbuf = gst_buffer_new (); + GST_BUFFER_DATA (outbuf) = g_malloc (24000); + GST_BUFFER_SIZE (outbuf) = 24000; - GST_BUFFER_TIMESTAMP (outbuf) = current_time; + len = sizeof (struct sockaddr); + if ((numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf), + GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *) &tmpaddr, + &len)) == -1) + goto receive_error; - discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, - current_time, NULL); + GST_BUFFER_SIZE (outbuf) = numbytes; + gst_pad_push (udpsrc->srcpad, outbuf); - udpsrc->defer_data = GST_DATA (discont); - } + return; - udpsrc->first_buf = FALSE; - } - - else { - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; - } - - len = sizeof (struct sockaddr); - numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf), - GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *) &tmpaddr, &len); - - if (numbytes != -1) { - GST_BUFFER_SIZE (outbuf) = numbytes; - } else { - perror ("recvfrom"); - gst_buffer_unref (outbuf); - outbuf = NULL; - } - } - } else { - perror ("select"); - outbuf = NULL; +select_error: + { + GST_DEBUG ("got select error"); + return; } - if (udpsrc->defer_data) { - GstData *databuf = udpsrc->defer_data; - - udpsrc->defer_data = GST_DATA (outbuf); - return databuf; +receive_error: + { + gst_buffer_unref (outbuf); + GST_DEBUG ("got receive error"); + return; } - - if (outbuf == NULL) - return GST_DATA (gst_event_new (GST_EVENT_EMPTY)); - - return GST_DATA (outbuf); } - static void gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstUDPSrc *udpsrc; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_UDPSRC (object)); udpsrc = GST_UDPSRC (object); switch (prop_id) { @@ -346,9 +217,6 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, udpsrc->multi_group = g_strdup (g_value_get_string (value)); break; - case ARG_CONTROL: - udpsrc->control = g_value_get_enum (value); - break; default: break; } @@ -360,8 +228,6 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, { GstUDPSrc *udpsrc; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_UDPSRC (object)); udpsrc = GST_UDPSRC (object); switch (prop_id) { @@ -371,9 +237,6 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, case ARG_MULTICAST_GROUP: g_value_set_string (value, udpsrc->multi_group); break; - case ARG_CONTROL: - g_value_set_enum (value, udpsrc->control); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -421,54 +284,6 @@ gst_udpsrc_init_receive (GstUDPSrc * src) setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val)); src->myaddr.sin_port = htons (src->port + 1); - switch (src->control) { - case CONTROL_TCP: - if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - perror ("control_socket"); - return FALSE; - } - - if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, - sizeof (src->myaddr)) == -1) { - perror ("control_bind"); - return FALSE; - } - - if (listen (src->control_sock, 5) == -1) { - perror ("listen"); - return FALSE; - } - - fcntl (src->control_sock, F_SETFL, O_NONBLOCK); - - break; - case CONTROL_UDP: - if ((src->control_sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1) { - perror ("socket"); - return FALSE; - } - - if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, - sizeof (src->myaddr)) == -1) { - perror ("control_bind"); - return FALSE; - } - /* We can only do broadcast in udp */ - bc_val = 1; - setsockopt (src->control_sock, SOL_SOCKET, SO_BROADCAST, &bc_val, - sizeof (bc_val)); - break; - case CONTROL_NONE: - GST_FLAG_SET (src, GST_UDPSRC_OPEN); - return TRUE; - break; - default: - return FALSE; - break; - } - - GST_FLAG_SET (src, GST_UDPSRC_OPEN); - return TRUE; } @@ -479,31 +294,86 @@ gst_udpsrc_close (GstUDPSrc * src) close (src->sock); src->sock = -1; } - if (src->control_sock != -1) { - close (src->control_sock); - src->control_sock = -1; - } +} + +static gboolean +gst_udpsrc_activate (GstPad * pad, GstActivateMode mode) +{ + gboolean result; + GstUDPSrc *udpsrc; - GST_FLAG_UNSET (src, GST_UDPSRC_OPEN); + udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad)); + + switch (mode) { + case GST_ACTIVATE_PUSH: + /* if we have a scheduler we can start the task */ + if (GST_ELEMENT_SCHEDULER (udpsrc)) { + GST_STREAM_LOCK (pad); + GST_RPAD_TASK (pad) = + gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (udpsrc), + (GstTaskFunction) gst_udpsrc_loop, pad); + + gst_task_start (GST_RPAD_TASK (pad)); + GST_STREAM_UNLOCK (pad); + result = TRUE; + } + break; + case GST_ACTIVATE_PULL: + result = FALSE; + break; + case GST_ACTIVATE_NONE: + /* step 1, unblock clock sync (if any) */ + + /* step 2, make sure streaming finishes */ + GST_STREAM_LOCK (pad); + /* step 3, stop the task */ + if (GST_RPAD_TASK (pad)) { + gst_task_stop (GST_RPAD_TASK (pad)); + gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); + GST_RPAD_TASK (pad) = NULL; + } + GST_STREAM_UNLOCK (pad); + + result = TRUE; + break; + } + return result; } static GstElementStateReturn gst_udpsrc_change_state (GstElement * element) { - g_return_val_if_fail (GST_IS_UDPSRC (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_UDPSRC_OPEN)) - gst_udpsrc_close (GST_UDPSRC (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_UDPSRC_OPEN)) { - if (!gst_udpsrc_init_receive (GST_UDPSRC (element))) - return GST_STATE_FAILURE; - } + GstElementStateReturn ret; + GstUDPSrc *src; + gint transition; + + src = GST_UDPSRC (element); + + transition = GST_STATE_TRANSITION (element); + + switch (transition) { + case GST_STATE_READY_TO_PAUSED: + if (!gst_udpsrc_init_receive (src)) + goto no_init; + break; + default: + break; } - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - return GST_STATE_SUCCESS; + switch (transition) { + case GST_STATE_PAUSED_TO_READY: + gst_udpsrc_close (src); + break; + default: + break; + } + + return ret; + +no_init: + { + return GST_STATE_FAILURE; + } } diff --git a/gst/udp/gstudpsrc.h b/gst/udp/gstudpsrc.h index 7573a2c6..a2cfb2e9 100644 --- a/gst/udp/gstudpsrc.h +++ b/gst/udp/gstudpsrc.h @@ -52,12 +52,6 @@ extern "C" { typedef struct _GstUDPSrc GstUDPSrc; typedef struct _GstUDPSrcClass GstUDPSrcClass; -typedef enum { - GST_UDPSRC_OPEN = GST_ELEMENT_FLAG_LAST, - - GST_UDPSRC_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2, -} GstUDPSrcFlags; - struct _GstUDPSrc { GstElement element; @@ -66,16 +60,10 @@ struct _GstUDPSrc { int port; int sock; - int control_sock; - Gst_UDP_Control control; gchar *multi_group; struct sockaddr_in myaddr; struct ip_mreq multi_addr; - GstClock *clock; - - gboolean first_buf; - GstData *defer_data; }; struct _GstUDPSrcClass { |