diff options
Diffstat (limited to 'gst/udp/gstudpsrc.c')
-rw-r--r-- | gst/udp/gstudpsrc.c | 360 |
1 files changed, 115 insertions, 245 deletions
diff --git a/gst/udp/gstudpsrc.c b/gst/udp/gstudpsrc.c index 801176c9..f6901a31 100644 --- a/gst/udp/gstudpsrc.c +++ b/gst/udp/gstudpsrc.c @@ -28,6 +28,11 @@ #define UDP_DEFAULT_PORT 4951 #define UDP_DEFAULT_MULTICAST_GROUP "0.0.0.0" +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + /* elementfactory information */ static GstElementDetails gst_udpsrc_details = GST_ELEMENT_DETAILS ("UDP packet receiver", @@ -46,42 +51,22 @@ enum { ARG_0, ARG_PORT, - ARG_CONTROL, ARG_MULTICAST_GROUP /* FILL ME */ }; -#define GST_TYPE_UDPSRC_CONTROL (gst_udpsrc_control_get_type()) -static GType -gst_udpsrc_control_get_type (void) -{ - static GType udpsrc_control_type = 0; - static GEnumValue udpsrc_control[] = { - {CONTROL_NONE, "1", "none"}, - {CONTROL_UDP, "2", "udp"}, - {CONTROL_TCP, "3", "tcp"}, - {CONTROL_ZERO, NULL, NULL}, - }; - - if (!udpsrc_control_type) { - udpsrc_control_type = - g_enum_register_static ("GstUDPSrcControl", udpsrc_control); - } - return udpsrc_control_type; -} - static void gst_udpsrc_base_init (gpointer g_class); static void gst_udpsrc_class_init (GstUDPSrc * klass); static void gst_udpsrc_init (GstUDPSrc * udpsrc); -static GstData *gst_udpsrc_get (GstPad * pad); +static void gst_udpsrc_loop (GstPad * pad); static GstElementStateReturn gst_udpsrc_change_state (GstElement * element); +static gboolean gst_udpsrc_activate (GstPad * pad, GstActivateMode mode); static void gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); static void gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static void gst_udpsrc_set_clock (GstElement * element, GstClock * clock); static GstElementClass *parent_class = NULL; @@ -117,6 +102,9 @@ gst_udpsrc_base_init (gpointer g_class) { GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + gst_element_class_add_pad_template (element_class, + gst_static_pad_template_get (&src_template)); + gst_element_class_set_details (element_class, &gst_udpsrc_details); } @@ -131,55 +119,37 @@ gst_udpsrc_class_init (GstUDPSrc * klass) parent_class = g_type_class_ref (GST_TYPE_ELEMENT); + gobject_class->set_property = gst_udpsrc_set_property; + gobject_class->get_property = gst_udpsrc_get_property; + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, g_param_spec_int ("port", "port", "The port to receive the packets from", 0, 32768, UDP_DEFAULT_PORT, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_CONTROL, - g_param_spec_enum ("control", "control", "The type of control", - GST_TYPE_UDPSRC_CONTROL, CONTROL_UDP, G_PARAM_READWRITE)); g_object_class_install_property (gobject_class, ARG_MULTICAST_GROUP, g_param_spec_string ("multicast_group", "multicast_group", "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP, G_PARAM_READWRITE)); - gobject_class->set_property = gst_udpsrc_set_property; - gobject_class->get_property = gst_udpsrc_get_property; - gstelement_class->change_state = gst_udpsrc_change_state; - gstelement_class->set_clock = gst_udpsrc_set_clock; -} - -static void -gst_udpsrc_set_clock (GstElement * element, GstClock * clock) -{ - GstUDPSrc *udpsrc; - - udpsrc = GST_UDPSRC (element); - - udpsrc->clock = clock; } static void gst_udpsrc_init (GstUDPSrc * udpsrc) { /* create the src and src pads */ - udpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + udpsrc->srcpad = gst_pad_new_from_template + (gst_static_pad_template_get (&src_template), "src"); + gst_pad_set_activate_function (udpsrc->srcpad, gst_udpsrc_activate); + gst_pad_set_loop_function (udpsrc->srcpad, gst_udpsrc_loop); gst_element_add_pad (GST_ELEMENT (udpsrc), udpsrc->srcpad); - gst_pad_set_get_function (udpsrc->srcpad, gst_udpsrc_get); udpsrc->port = UDP_DEFAULT_PORT; - udpsrc->control = CONTROL_UDP; - udpsrc->clock = NULL; udpsrc->sock = -1; - udpsrc->control_sock = -1; udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP); - - udpsrc->first_buf = TRUE; - udpsrc->defer_data = NULL; } -static GstData * -gst_udpsrc_get (GstPad * pad) +static void +gst_udpsrc_loop (GstPad * pad) { GstUDPSrc *udpsrc; GstBuffer *outbuf; @@ -189,148 +159,49 @@ gst_udpsrc_get (GstPad * pad) fd_set read_fds; guint max_sock; - g_return_val_if_fail (pad != NULL, NULL); - g_return_val_if_fail (GST_IS_PAD (pad), NULL); - udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad)); - if (udpsrc->defer_data != NULL) { - GstData *outdata = udpsrc->defer_data; - - udpsrc->defer_data = NULL; - return outdata; - } - FD_ZERO (&read_fds); FD_SET (udpsrc->sock, &read_fds); - if (udpsrc->control != CONTROL_NONE) { - FD_SET (udpsrc->control_sock, &read_fds); - } - max_sock = MAX (udpsrc->sock, udpsrc->control_sock); - - if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) > 0) { - if ((udpsrc->control_sock != -1) && - FD_ISSET (udpsrc->control_sock, &read_fds)) { -#ifndef GST_DISABLE_LOADSAVE - guchar *buf; - int ret; - int fdread; - struct sockaddr addr; - xmlDocPtr doc; - GstCaps *caps; - - buf = g_malloc (1024 * 10); - - switch (udpsrc->control) { - case CONTROL_TCP: - len = sizeof (struct sockaddr); - fdread = accept (udpsrc->control_sock, &addr, &len); - if (fdread < 0) { - perror ("accept"); - } - - ret = read (fdread, buf, 1024 * 10); - break; - case CONTROL_UDP: - len = sizeof (struct sockaddr); - ret = - recvfrom (udpsrc->control_sock, buf, 1024 * 10, 0, - (struct sockaddr *) &tmpaddr, &len); - if (ret < 0) { - perror ("recvfrom"); - } - break; - case CONTROL_NONE: - default: - g_free (buf); - return NULL; - break; - } - - buf[ret] = '\0'; - doc = xmlParseMemory (buf, ret); - caps = gst_caps_load_thyself (doc->xmlRootNode); - if (caps == NULL) { - return NULL; - } - - /* foward the connect, we don't signal back the result here... */ - if (gst_caps_is_fixed (caps)) { - gst_pad_try_set_caps (udpsrc->srcpad, caps); - } else { - GST_ERROR ("caps %" GST_PTR_FORMAT, caps); - GST_ELEMENT_ERROR (udpsrc, CORE, NEGOTIATION, (NULL), - ("Got unfixed caps from peer")); - } - g_free (buf); -#endif - - outbuf = NULL; - } else { - outbuf = gst_buffer_new (); - GST_BUFFER_DATA (outbuf) = g_malloc (24000); - GST_BUFFER_SIZE (outbuf) = 24000; + max_sock = udpsrc->sock; - if (udpsrc->first_buf) { - if (udpsrc->clock) { - GstClockTime current_time; - GstEvent *discont; + if (select (max_sock + 1, &read_fds, NULL, NULL, NULL) < 0) + goto select_error; - current_time = gst_clock_get_time (udpsrc->clock); + outbuf = gst_buffer_new (); + GST_BUFFER_DATA (outbuf) = g_malloc (24000); + GST_BUFFER_SIZE (outbuf) = 24000; - GST_BUFFER_TIMESTAMP (outbuf) = current_time; + len = sizeof (struct sockaddr); + if ((numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf), + GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *) &tmpaddr, + &len)) == -1) + goto receive_error; - discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, - current_time, NULL); + GST_BUFFER_SIZE (outbuf) = numbytes; + gst_pad_push (udpsrc->srcpad, outbuf); - udpsrc->defer_data = GST_DATA (discont); - } + return; - udpsrc->first_buf = FALSE; - } - - else { - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; - } - - len = sizeof (struct sockaddr); - numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf), - GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *) &tmpaddr, &len); - - if (numbytes != -1) { - GST_BUFFER_SIZE (outbuf) = numbytes; - } else { - perror ("recvfrom"); - gst_buffer_unref (outbuf); - outbuf = NULL; - } - } - } else { - perror ("select"); - outbuf = NULL; +select_error: + { + GST_DEBUG ("got select error"); + return; } - if (udpsrc->defer_data) { - GstData *databuf = udpsrc->defer_data; - - udpsrc->defer_data = GST_DATA (outbuf); - return databuf; +receive_error: + { + gst_buffer_unref (outbuf); + GST_DEBUG ("got receive error"); + return; } - - if (outbuf == NULL) - return GST_DATA (gst_event_new (GST_EVENT_EMPTY)); - - return GST_DATA (outbuf); } - static void gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) { GstUDPSrc *udpsrc; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_UDPSRC (object)); udpsrc = GST_UDPSRC (object); switch (prop_id) { @@ -346,9 +217,6 @@ gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value, udpsrc->multi_group = g_strdup (g_value_get_string (value)); break; - case ARG_CONTROL: - udpsrc->control = g_value_get_enum (value); - break; default: break; } @@ -360,8 +228,6 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, { GstUDPSrc *udpsrc; - /* it's not null if we got it, but it might not be ours */ - g_return_if_fail (GST_IS_UDPSRC (object)); udpsrc = GST_UDPSRC (object); switch (prop_id) { @@ -371,9 +237,6 @@ gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value, case ARG_MULTICAST_GROUP: g_value_set_string (value, udpsrc->multi_group); break; - case ARG_CONTROL: - g_value_set_enum (value, udpsrc->control); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -421,54 +284,6 @@ gst_udpsrc_init_receive (GstUDPSrc * src) setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val)); src->myaddr.sin_port = htons (src->port + 1); - switch (src->control) { - case CONTROL_TCP: - if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - perror ("control_socket"); - return FALSE; - } - - if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, - sizeof (src->myaddr)) == -1) { - perror ("control_bind"); - return FALSE; - } - - if (listen (src->control_sock, 5) == -1) { - perror ("listen"); - return FALSE; - } - - fcntl (src->control_sock, F_SETFL, O_NONBLOCK); - - break; - case CONTROL_UDP: - if ((src->control_sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1) { - perror ("socket"); - return FALSE; - } - - if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, - sizeof (src->myaddr)) == -1) { - perror ("control_bind"); - return FALSE; - } - /* We can only do broadcast in udp */ - bc_val = 1; - setsockopt (src->control_sock, SOL_SOCKET, SO_BROADCAST, &bc_val, - sizeof (bc_val)); - break; - case CONTROL_NONE: - GST_FLAG_SET (src, GST_UDPSRC_OPEN); - return TRUE; - break; - default: - return FALSE; - break; - } - - GST_FLAG_SET (src, GST_UDPSRC_OPEN); - return TRUE; } @@ -479,31 +294,86 @@ gst_udpsrc_close (GstUDPSrc * src) close (src->sock); src->sock = -1; } - if (src->control_sock != -1) { - close (src->control_sock); - src->control_sock = -1; - } +} + +static gboolean +gst_udpsrc_activate (GstPad * pad, GstActivateMode mode) +{ + gboolean result; + GstUDPSrc *udpsrc; - GST_FLAG_UNSET (src, GST_UDPSRC_OPEN); + udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad)); + + switch (mode) { + case GST_ACTIVATE_PUSH: + /* if we have a scheduler we can start the task */ + if (GST_ELEMENT_SCHEDULER (udpsrc)) { + GST_STREAM_LOCK (pad); + GST_RPAD_TASK (pad) = + gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (udpsrc), + (GstTaskFunction) gst_udpsrc_loop, pad); + + gst_task_start (GST_RPAD_TASK (pad)); + GST_STREAM_UNLOCK (pad); + result = TRUE; + } + break; + case GST_ACTIVATE_PULL: + result = FALSE; + break; + case GST_ACTIVATE_NONE: + /* step 1, unblock clock sync (if any) */ + + /* step 2, make sure streaming finishes */ + GST_STREAM_LOCK (pad); + /* step 3, stop the task */ + if (GST_RPAD_TASK (pad)) { + gst_task_stop (GST_RPAD_TASK (pad)); + gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad))); + GST_RPAD_TASK (pad) = NULL; + } + GST_STREAM_UNLOCK (pad); + + result = TRUE; + break; + } + return result; } static GstElementStateReturn gst_udpsrc_change_state (GstElement * element) { - g_return_val_if_fail (GST_IS_UDPSRC (element), GST_STATE_FAILURE); - - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_UDPSRC_OPEN)) - gst_udpsrc_close (GST_UDPSRC (element)); - } else { - if (!GST_FLAG_IS_SET (element, GST_UDPSRC_OPEN)) { - if (!gst_udpsrc_init_receive (GST_UDPSRC (element))) - return GST_STATE_FAILURE; - } + GstElementStateReturn ret; + GstUDPSrc *src; + gint transition; + + src = GST_UDPSRC (element); + + transition = GST_STATE_TRANSITION (element); + + switch (transition) { + case GST_STATE_READY_TO_PAUSED: + if (!gst_udpsrc_init_receive (src)) + goto no_init; + break; + default: + break; } - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - return GST_STATE_SUCCESS; + switch (transition) { + case GST_STATE_PAUSED_TO_READY: + gst_udpsrc_close (src); + break; + default: + break; + } + + return ret; + +no_init: + { + return GST_STATE_FAILURE; + } } |