gstreamer_core/plugins/elements/gstfdsrc.c
branchRCL_3
changeset 29 567bb019e3e3
parent 0 0e761a78d257
child 30 7e817e7e631c
--- a/gstreamer_core/plugins/elements/gstfdsrc.c	Wed Mar 31 22:03:18 2010 +0300
+++ b/gstreamer_core/plugins/elements/gstfdsrc.c	Tue Aug 31 15:30:33 2010 +0300
@@ -22,22 +22,54 @@
  */
 /**
  * SECTION:element-fdsrc
- * @short_description: read from a unix file descriptor
  * @see_also: #GstFdSink
  *
  * Read data from a unix file descriptor.
+ * 
+ * To generate data, enter some data on the console folowed by enter.
+ * The above mentioned pipeline should dump data packets to the console.
+ * 
+ * If the #GstFdSrc:timeout property is set to a value bigger than 0, fdsrc will
+ * generate an element message named
+ * <classname>&quot;GstFdSrcTimeout&quot;</classname>
+ * if no data was recieved in the given timeout.
+ * The message's structure contains one field:
+ * <itemizedlist>
+ * <listitem>
+ *   <para>
+ *   #guint64
+ *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
+ *   expired when waiting for data.
+ *   </para>
+ * </listitem>
+ * </itemizedlist>
+ * 
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * echo "Hello GStreamer" | gst-launch -v fdsrc ! fakesink dump=true
+ * ]| A simple pipeline to read from the standard input and dump the data
+ * with a fakesink as hex ascii block.
+ * </refsect2>
+ * 
+ * Last reviewed on 2008-06-20 (0.10.21)
  */
 
-
 #ifdef HAVE_CONFIG_H
 #  include "config.h"
 #endif
-#ifdef __SYMBIAN32__
-#include <gst_global.h>
-#endif
 #include "gst/gst_private.h"
 
 #include <sys/types.h>
+
+#ifdef G_OS_WIN32
+#include <io.h>                 /* lseek, open, close, read */
+#undef lseek
+#define lseek _lseeki64
+#undef off_t
+#define off_t guint64
+#endif
+
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <fcntl.h>
@@ -52,14 +84,6 @@
 #include <errno.h>
 
 #include "gstfdsrc.h"
-#ifdef __SYMBIAN32__
-#include <glib_global.h>
-#include <gobject_global.h>
-#include <gstpoll.h>
-#include <gstelement.h>
-#endif
-
-#define DEFAULT_BLOCKSIZE       4096
 
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
@@ -69,10 +93,17 @@
 GST_DEBUG_CATEGORY_STATIC (gst_fd_src_debug);
 #define GST_CAT_DEFAULT gst_fd_src_debug
 
+#define DEFAULT_FD              0
+#define DEFAULT_TIMEOUT         0
+
 enum
 {
   PROP_0,
+
   PROP_FD,
+  PROP_TIMEOUT,
+
+  PROP_LAST
 };
 
 static void gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data);
@@ -108,6 +139,7 @@
 static gboolean gst_fd_src_is_seekable (GstBaseSrc * bsrc);
 static gboolean gst_fd_src_get_size (GstBaseSrc * src, guint64 * size);
 static gboolean gst_fd_src_do_seek (GstBaseSrc * src, GstSegment * segment);
+static gboolean gst_fd_src_query (GstBaseSrc * src, GstQuery * query);
 
 static GstFlowReturn gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf);
 
@@ -129,11 +161,9 @@
 {
   GObjectClass *gobject_class;
   GstBaseSrcClass *gstbasesrc_class;
-  GstElementClass *gstelement_class;
   GstPushSrcClass *gstpush_src_class;
 
   gobject_class = G_OBJECT_CLASS (klass);
-  gstelement_class = GST_ELEMENT_CLASS (klass);
   gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
   gstpush_src_class = GST_PUSH_SRC_CLASS (klass);
 
@@ -145,7 +175,19 @@
 
   g_object_class_install_property (gobject_class, PROP_FD,
       g_param_spec_int ("fd", "fd", "An open file descriptor to read from",
-          0, G_MAXINT, 0, G_PARAM_READWRITE));
+          0, G_MAXINT, DEFAULT_FD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  /**
+   * GstFdSrc:timeout
+   *
+   * Post a message after timeout microseconds
+   *
+   * Since: 0.10.21
+   */
+  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
+      g_param_spec_uint64 ("timeout", "Timeout",
+          "Post a message after timeout microseconds (0 = disabled)", 0,
+          G_MAXUINT64, DEFAULT_TIMEOUT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_fd_src_start);
   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_fd_src_stop);
@@ -154,6 +196,7 @@
   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_fd_src_is_seekable);
   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_fd_src_get_size);
   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_fd_src_do_seek);
+  gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_fd_src_query);
 
   gstpush_src_class->create = GST_DEBUG_FUNCPTR (gst_fd_src_create);
 }
@@ -161,9 +204,10 @@
 static void
 gst_fd_src_init (GstFdSrc * fdsrc, GstFdSrcClass * klass)
 {
-  fdsrc->fd = -1;
   fdsrc->new_fd = 0;
   fdsrc->seekable_fd = FALSE;
+  fdsrc->fd = DEFAULT_FD;
+  fdsrc->timeout = DEFAULT_TIMEOUT;
   fdsrc->uri = g_strdup_printf ("fd://0");
   fdsrc->curoffset = 0;
 }
@@ -185,7 +229,7 @@
   struct stat stat_results;
 
   /* we need to always update the fdset since it may not have existed when
-   * gst_fd_src_update_fd() was called earlier */
+   * gst_fd_src_update_fd () was called earlier */
   if (src->fdset != NULL) {
     GstPollFD fd = GST_POLL_FD_INIT;
 
@@ -312,6 +356,11 @@
       }
       GST_OBJECT_UNLOCK (object);
       break;
+    case PROP_TIMEOUT:
+      src->timeout = g_value_get_uint64 (value);
+      GST_DEBUG_OBJECT (src, "poll timeout set to %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (src->timeout));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -328,6 +377,9 @@
     case PROP_FD:
       g_value_set_int (value, src->fd);
       break;
+    case PROP_TIMEOUT:
+      g_value_set_uint64 (value, src->timeout);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -341,30 +393,59 @@
   GstBuffer *buf;
   gssize readbytes;
   guint blocksize;
+  GstClockTime timeout;
 
 #ifndef HAVE_WIN32
+  gboolean try_again;
   gint retval;
 #endif
 
   src = GST_FD_SRC (psrc);
 
+  if (src->timeout > 0) {
+    timeout = src->timeout * GST_USECOND;
+  } else {
+    timeout = GST_CLOCK_TIME_NONE;
+  }
+
 #ifndef HAVE_WIN32
   do {
-    retval = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE);
-  } while (retval == -1 && (errno == EINTR || errno == EAGAIN));        /* retry if interrupted */
+    try_again = FALSE;
+
+    GST_LOG_OBJECT (src, "doing poll, timeout %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (src->timeout));
+
+    retval = gst_poll_wait (src->fdset, timeout);
+    GST_LOG_OBJECT (src, "poll returned %d", retval);
 
-  if (retval == -1) {
-    if (errno == EBUSY)
-      goto stopped;
-    else
-      goto select_error;
-  }
+    if (G_UNLIKELY (retval == -1)) {
+      if (errno == EINTR || errno == EAGAIN) {
+        /* retry if interrupted */
+        try_again = TRUE;
+      } else if (errno == EBUSY) {
+        goto stopped;
+      } else {
+        goto poll_error;
+      }
+    } else if (G_UNLIKELY (retval == 0)) {
+      try_again = TRUE;
+      /* timeout, post element message */
+      gst_element_post_message (GST_ELEMENT_CAST (src),
+          gst_message_new_element (GST_OBJECT_CAST (src),
+              gst_structure_new ("GstFdSrcTimeout",
+                  "timeout", G_TYPE_UINT64, src->timeout, NULL)));
+    }
+  } while (G_UNLIKELY (try_again));     /* retry if interrupted or timeout */
 #endif
 
   blocksize = GST_BASE_SRC (src)->blocksize;
 
   /* create the buffer */
-  buf = gst_buffer_new_and_alloc (blocksize);
+  buf = gst_buffer_try_new_and_alloc (blocksize);
+  if (G_UNLIKELY (buf == NULL)) {
+    GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", blocksize);
+    return GST_FLOW_ERROR;
+  }
 
   do {
     readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize);
@@ -391,16 +472,16 @@
 
   /* ERRORS */
 #ifndef HAVE_WIN32
-select_error:
+poll_error:
   {
     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
-        ("select on file descriptor: %s.", g_strerror (errno)));
-    GST_DEBUG_OBJECT (psrc, "Error during select");
+        ("poll on file descriptor: %s.", g_strerror (errno)));
+    GST_DEBUG_OBJECT (psrc, "Error during poll");
     return GST_FLOW_ERROR;
   }
 stopped:
   {
-    GST_DEBUG_OBJECT (psrc, "Select stopped");
+    GST_DEBUG_OBJECT (psrc, "Poll stopped");
     return GST_FLOW_WRONG_STATE;
   }
 #endif
@@ -421,6 +502,28 @@
 }
 
 static gboolean
+gst_fd_src_query (GstBaseSrc * basesrc, GstQuery * query)
+{
+  gboolean ret = FALSE;
+  GstFdSrc *src = GST_FD_SRC (basesrc);
+
+  switch (GST_QUERY_TYPE (query)) {
+    case GST_QUERY_URI:
+      gst_query_set_uri (query, src->uri);
+      ret = TRUE;
+      break;
+    default:
+      ret = FALSE;
+      break;
+  }
+
+  if (!ret)
+    ret = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
+
+  return ret;
+}
+
+static gboolean
 gst_fd_src_is_seekable (GstBaseSrc * bsrc)
 {
   GstFdSrc *src = GST_FD_SRC (bsrc);
@@ -482,16 +585,13 @@
 }
 
 /*** GSTURIHANDLER INTERFACE *************************************************/
-#ifdef __SYMBIAN32__
-GstURIType
-#else
-static guint
-#endif
 
+static GstURIType
 gst_fd_src_uri_get_type (void)
 {
   return GST_URI_SRC;
 }
+
 static gchar **
 gst_fd_src_uri_get_protocols (void)
 {
@@ -499,6 +599,7 @@
 
   return protocols;
 }
+
 static const gchar *
 gst_fd_src_uri_get_uri (GstURIHandler * handler)
 {