From aeb4ab082e7a7e5b28650706424be5e72ec1eb8f Mon Sep 17 00:00:00 2001 From: Andy Wingo Date: Fri, 7 Oct 2005 15:24:24 +0000 Subject: ext/raw1394/gstdv1394src.c: Make interruptible, so it won't block forever in a read(). Original commit message from CVS: 2005-10-07 Andy Wingo * ext/raw1394/gstdv1394src.c: Make interruptible, so it won't block forever in a read(). --- ext/raw1394/gstdv1394src.c | 130 ++++++++++++++++++++++++++++++++++++++++----- ext/raw1394/gstdv1394src.h | 8 +-- 2 files changed, 119 insertions(+), 19 deletions(-) (limited to 'ext/raw1394') diff --git a/ext/raw1394/gstdv1394src.c b/ext/raw1394/gstdv1394src.c index 012d09a7..c408357d 100644 --- a/ext/raw1394/gstdv1394src.c +++ b/ext/raw1394/gstdv1394src.c @@ -23,14 +23,40 @@ #ifdef HAVE_CONFIG_H #include "config.h" #endif -#include +#include +#include +#include +#include +#include #include + #include #include #include +#include + +#include #include "gstdv1394src.h" + +#define CONTROL_STOP 'S' /* stop the select call */ +#define CONTROL_SOCKETS(src) src->control_sock +#define WRITE_SOCKET(src) src->control_sock[1] +#define READ_SOCKET(src) src->control_sock[0] + +#define SEND_COMMAND(src, command) \ +G_STMT_START { \ + unsigned char c; c = command; \ + write (WRITE_SOCKET(src), &c, 1); \ +} G_STMT_END + +#define READ_COMMAND(src, command, res) \ +G_STMT_START { \ + res = read(READ_SOCKET(src), &command, 1); \ +} G_STMT_END + + GST_DEBUG_CATEGORY_STATIC (dv1394src_debug); #define GST_CAT_DEFAULT (dv1394src_debug) @@ -91,6 +117,7 @@ static void gst_dv1394src_get_property (GObject * object, guint prop_id, static gboolean gst_dv1394src_start (GstBaseSrc * bsrc); static gboolean gst_dv1394src_stop (GstBaseSrc * bsrc); +static gboolean gst_dv1394src_unlock (GstBaseSrc * bsrc); static GstFlowReturn gst_dv1394src_create (GstPushSrc * psrc, GstBuffer ** buf); @@ -181,6 +208,8 @@ gst_dv1394src_class_init (GstDV1394SrcClass * klass) gstbasesrc_class->negotiate = NULL; gstbasesrc_class->start = gst_dv1394src_start; gstbasesrc_class->stop = gst_dv1394src_stop; + gstbasesrc_class->unlock = gst_dv1394src_unlock; + gstpushsrc_class->create = gst_dv1394src_create; } @@ -195,7 +224,6 @@ gst_dv1394src_init (GstDV1394Src * dv1394src, GstDV1394SrcClass * klass) gst_pad_set_query_function (srcpad, gst_dv1394src_query); gst_pad_set_query_type_function (srcpad, gst_dv1394src_get_query_types); - dv1394src->dv_lock = g_mutex_new (); dv1394src->port = DEFAULT_PORT; dv1394src->channel = DEFAULT_CHANNEL; @@ -205,6 +233,9 @@ gst_dv1394src_init (GstDV1394Src * dv1394src, GstDV1394SrcClass * klass) dv1394src->use_avc = DEFAULT_USE_AVC; dv1394src->guid = DEFAULT_GUID; + READ_SOCKET (dv1394src) = -1; + WRITE_SOCKET (dv1394src) = -1; + /* initialized when first header received */ dv1394src->frame_size = 0; @@ -302,7 +333,7 @@ gst_dv1394src_iso_receive (raw1394handle_t handle, int channel, size_t len, */ if (section_type == 0 && dif_sequence == 0) { // dif header - if (!dv1394src->negotiated) { + if (!GST_PAD_CAPS (GST_BASE_SRC_PAD (dv1394src))) { GstCaps *caps; // figure format (NTSC/PAL) @@ -326,7 +357,6 @@ gst_dv1394src_iso_receive (raw1394handle_t handle, int channel, size_t len, } gst_pad_set_caps (GST_BASE_SRC_PAD (dv1394src), caps); gst_caps_unref (caps); - dv1394src->negotiated = TRUE; } // drop last frame when not complete if (!dv1394src->drop_incomplete @@ -417,20 +447,67 @@ gst_dv1394src_create (GstPushSrc * psrc, GstBuffer ** buf) { GstDV1394Src *dv1394src = GST_DV1394SRC (psrc); GstCaps *caps; + struct pollfd pollfds[2]; - GST_DV_LOCK (dv1394src); - dv1394src->buf = NULL; - while (dv1394src->buf == NULL) - raw1394_loop_iterate (dv1394src->handle); + pollfds[0].fd = raw1394_get_fd (dv1394src->handle); + pollfds[0].events = POLLIN | POLLERR | POLLHUP | POLLPRI; + pollfds[1].fd = READ_SOCKET (dv1394src); + pollfds[1].events = POLLIN | POLLERR | POLLHUP | POLLPRI; + + if (dv1394src->buf) { + /* maybe we had an error before, and there's a stale buffer? */ + gst_buffer_unref (dv1394src->buf); + dv1394src->buf = NULL; + } + + while (TRUE) { + int res = poll (pollfds, 2, -1); + + if (res < 0) { + if (errno == EAGAIN || errno == EINTR) + continue; + else + goto error_while_polling; + } + + if (pollfds[1].revents) { + char command; + + g_print ("told to stop!\n"); + + if (pollfds[1].revents & POLLIN) + READ_COMMAND (dv1394src, command, res); + + goto told_to_stop; + } else if (pollfds[0].revents & POLLIN) { + /* shouldn't block in theory */ + raw1394_loop_iterate (dv1394src->handle); + if (dv1394src->buf) + break; + } + } + + g_assert (dv1394src->buf); caps = gst_pad_get_caps (GST_BASE_SRC_PAD (psrc)); gst_buffer_set_caps (dv1394src->buf, caps); gst_caps_unref (caps); *buf = dv1394src->buf; - GST_DV_UNLOCK (dv1394src); + dv1394src->buf = NULL; return GST_FLOW_OK; + +error_while_polling: + { + GST_ELEMENT_ERROR (dv1394src, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); + return GST_FLOW_UNEXPECTED; + } +told_to_stop: + { + GST_DEBUG_OBJECT (dv1394src, "told to stop, shutting down"); + return GST_FLOW_WRONG_STATE; + } } static int @@ -504,6 +581,16 @@ static gboolean gst_dv1394src_start (GstBaseSrc * bsrc) { GstDV1394Src *src = GST_DV1394SRC (bsrc); + int control_sock[2]; + + if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) + goto socket_pair; + + READ_SOCKET (src) = control_sock[0]; + WRITE_SOCKET (src) = control_sock[1]; + + fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK); + fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK); src->handle = raw1394_new_handle (); @@ -542,6 +629,12 @@ gst_dv1394src_start (GstBaseSrc * bsrc) return TRUE; +socket_pair: + { + GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), + GST_ERROR_SYSTEM); + return FALSE; + } no_handle: { GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL), @@ -573,7 +666,11 @@ gst_dv1394src_stop (GstBaseSrc * bsrc) { GstDV1394Src *src = GST_DV1394SRC (bsrc); - GST_DV_LOCK (src); + close (READ_SOCKET (src)); + close (WRITE_SOCKET (src)); + READ_SOCKET (src) = -1; + WRITE_SOCKET (src) = -1; + raw1394_stop_iso_rcv (src->handle, src->channel); if (src->use_avc) { /* pause the VCR */ @@ -582,8 +679,6 @@ gst_dv1394src_stop (GstBaseSrc * bsrc) != AVC1394_VCR_OPERAND_PLAY_FORWARD_PAUSE)) avc1394_vcr_pause (src->handle, src->avc_node); } - GST_DV_UNLOCK (src); - src->negotiated = FALSE; if (src->use_avc) /* stop the VCR */ @@ -594,6 +689,17 @@ gst_dv1394src_stop (GstBaseSrc * bsrc) return TRUE; } +static gboolean +gst_dv1394src_unlock (GstBaseSrc * bsrc) +{ + GstDV1394Src *src = GST_DV1394SRC (bsrc); + + g_print ("sending command!\n"); + SEND_COMMAND (src, CONTROL_STOP); + + return TRUE; +} + static gboolean gst_dv1394src_convert (GstPad * pad, GstFormat src_format, gint64 src_value, diff --git a/ext/raw1394/gstdv1394src.h b/ext/raw1394/gstdv1394src.h index 23fc65bb..7f90d484 100644 --- a/ext/raw1394/gstdv1394src.h +++ b/ext/raw1394/gstdv1394src.h @@ -43,10 +43,6 @@ G_BEGIN_DECLS typedef struct _GstDV1394Src GstDV1394Src; typedef struct _GstDV1394SrcClass GstDV1394SrcClass; -#define GST_DV_GET_LOCK(dv) (GST_DV1394SRC (dv)->dv_lock) -#define GST_DV_LOCK(dv) g_mutex_lock(GST_DV_GET_LOCK (dv)) -#define GST_DV_UNLOCK(dv) g_mutex_unlock(GST_DV_GET_LOCK (dv)) - struct _GstDV1394Src { GstPushSrc element; @@ -55,8 +51,6 @@ struct _GstDV1394Src { gint skip; gboolean drop_incomplete; - GMutex *dv_lock; - gint num_ports; gint port; gint channel; @@ -75,7 +69,7 @@ struct _GstDV1394Src { guint bytes_in_frame; guint frame_sequence; - gboolean negotiated; + int control_sock[2]; gchar *uri; }; -- cgit