gstreamer_core/gst/gstpoll.c
branchRCL_3
changeset 7 567bb019e3e3
parent 0 0e761a78d257
child 8 7e817e7e631c
--- a/gstreamer_core/gst/gstpoll.c	Wed Mar 31 22:03:18 2010 +0300
+++ b/gstreamer_core/gst/gstpoll.c	Tue Aug 31 15:30:33 2010 +0300
@@ -68,6 +68,8 @@
 #include <sys/time.h>
 #endif
 
+#include "gst_private.h"
+
 #include <sys/types.h>
 
 #ifdef HAVE_UNISTD_H
@@ -89,8 +91,6 @@
 /* OS/X needs this because of bad headers */
 #include <string.h>
 
-#include "gst_private.h"
-
 #include "gstpoll.h"
 
 #ifdef __SYMBIAN32__
@@ -112,15 +112,25 @@
 /* the poll/select call is also performed on a control socket, that way
  * we can send special commands to control it
  */
-#define SEND_COMMAND(set, command)                   \
+/* FIXME: Shouldn't we check or return the return value
+ * of write()?
+ */
+#define SEND_COMMAND(set, command, result)           \
 G_STMT_START {                                       \
   unsigned char c = command;                         \
-  write (set->control_write_fd.fd, &c, 1);           \
+  result = write (set->control_write_fd.fd, &c, 1);  \
+  if (result > 0)                                    \
+    set->control_pending++;                          \
 } G_STMT_END
 
-#define READ_COMMAND(set, command, res)              \
-G_STMT_START {                                       \
-  res = read (set->control_read_fd.fd, &command, 1); \
+#define READ_COMMAND(set, command, res)                \
+G_STMT_START {                                         \
+  if (set->control_pending > 0) {                      \
+    res = read (set->control_read_fd.fd, &command, 1); \
+    if (res == 1)                                      \
+      set->control_pending--;                          \
+  } else                                               \
+    res = 0;                                           \
 } G_STMT_END
 
 #define GST_POLL_CMD_WAKEUP  'W'        /* restart the poll/select call */
@@ -160,7 +170,6 @@
   GstPollFD control_write_fd;
 #else
   GArray *active_fds_ignored;
-
   GArray *events;
   GArray *active_events;
 
@@ -169,8 +178,10 @@
 
   gboolean controllable;
   gboolean new_controllable;
-  gboolean waiting;
+  guint waiting;
+  guint control_pending;
   gboolean flushing;
+  gboolean timer;
 };
 
 static gint
@@ -298,7 +309,7 @@
   FD_ZERO (writefds);
 
   g_mutex_lock (set->lock);
-  
+
   for (i = 0; i < set->active_fds->len; i++) {
 #ifndef __SYMBIAN32__ 
     struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
@@ -316,8 +327,7 @@
     }
   }
 
-    g_mutex_unlock (set->lock);
-
+  g_mutex_unlock (set->lock);
 
   return max_fd;
 }
@@ -502,7 +512,7 @@
 {
   GstPoll *nset;
 
-  nset = g_new0 (GstPoll, 1);
+  nset = g_slice_new0 (GstPoll);
   nset->lock = g_mutex_new ();
 #ifndef G_OS_WIN32
   nset->mode = GST_POLL_MODE_AUTO;
@@ -537,6 +547,40 @@
 }
 
 /**
+ * gst_poll_new_timer:
+ *
+ * Create a new poll object that can be used for scheduling cancellable
+ * timeouts.
+ *
+ * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
+ * performed from different threads. 
+ *
+ * Returns: a new #GstPoll, or %NULL in case of an error. Free with
+ * gst_poll_free().
+ *
+ * Since: 0.10.23
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+ 
+GstPoll *
+gst_poll_new_timer (void)
+{
+  GstPoll *poll;
+
+  /* make a new controllable poll set */
+  if (!(poll = gst_poll_new (TRUE)))
+    goto done;
+
+  /* we are a timer */
+  poll->timer = TRUE;
+
+done:
+  return poll;
+}
+
+/**
  * gst_poll_free:
  * @set: a file descriptor set.
  *
@@ -575,7 +619,7 @@
   g_array_free (set->active_fds, TRUE);
   g_array_free (set->fds, TRUE);
   g_mutex_free (set->lock);
-  g_free (set);
+  g_slice_free (GstPoll, set);
 }
 
 /**
@@ -756,7 +800,8 @@
     else
       pfd->events &= ~POLLOUT;
 #else
-    gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active);
+    gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
+        active);
 #endif
   }
 
@@ -812,7 +857,9 @@
 #ifdef __SYMBIAN32__
 EXPORT_C
 #endif
- gboolean gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
+
+gboolean
+gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
 {
   gboolean ret;
 
@@ -849,7 +896,8 @@
 EXPORT_C
 #endif
 
- void gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
+void
+gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
 {
 #ifdef G_OS_WIN32
   gint idx;
@@ -886,7 +934,8 @@
 EXPORT_C
 #endif
 
- gboolean gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
+gboolean
+gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
 {
   gboolean res = FALSE;
   gint idx;
@@ -959,7 +1008,8 @@
     res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
         (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
         (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
-        (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0);
+        (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
+        (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
 #endif
   }
 
@@ -1006,7 +1056,8 @@
 EXPORT_C
 #endif
 
-gboolean gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
+gboolean
+gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
 {
   gboolean res = FALSE;
 
@@ -1038,7 +1089,8 @@
 EXPORT_C
 #endif
 
-gboolean gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
+gboolean
+gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
 {
   gboolean res = FALSE;
   gint idx;
@@ -1113,8 +1165,13 @@
  * Wait for activity on the file descriptors in @set. This function waits up to
  * the specified @timeout.  A timeout of #GST_CLOCK_TIME_NONE waits forever.
  *
- * When this function is called from multiple threads, -1 will be returned with
- * errno set to EPERM.
+ * For #GstPoll objects created with gst_poll_new(), this function can only be
+ * called from a single thread at a time.  If called from multiple threads,
+ * -1 will be returned with errno set to EPERM.
+ *
+ * This is not true for timer #GstPoll objects created with
+ * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
+ * simultaneously.
  *
  * Returns: The number of #GstPollFD in @set that have activity or 0 when no
  * activity was detected after @timeout. If an error occurs, -1 is returned
@@ -1126,24 +1183,26 @@
 EXPORT_C
 #endif
 
-gint gst_poll_wait (GstPoll * set, GstClockTime timeout)
+gint
+gst_poll_wait (GstPoll * set, GstClockTime timeout)
 {
   gboolean restarting;
-  int res = -1;
+  int res;
 
   g_return_val_if_fail (set != NULL, -1);
 
   g_mutex_lock (set->lock);
 
-  /* we cannot wait from multiple threads */
-  if (set->waiting)
+  /* we cannot wait from multiple threads unless we are a timer */
+  if (G_UNLIKELY (set->waiting > 0 && !set->timer))
     goto already_waiting;
 
   /* flushing, exit immediatly */
-  if (set->flushing)
+  if (G_UNLIKELY (set->flushing))
     goto flushing;
 
-  set->waiting = TRUE;
+  /* add one more waiter */
+  set->waiting++;
 
   do {
     GstPollMode mode;
@@ -1240,7 +1299,7 @@
           } else {
             tvptr = NULL;
           }
-//temporary  fix for multifdsink  
+	  
           FD_ZERO(&excepfds);
          
         if( max_fd != -1)
@@ -1332,20 +1391,21 @@
 
     g_mutex_lock (set->lock);
 
-    gst_poll_check_ctrl_commands (set, res, &restarting);
+    if (!set->timer)
+      gst_poll_check_ctrl_commands (set, res, &restarting);
 
     /* update the controllable state if needed */
     set->controllable = set->new_controllable;
 
-    if (set->flushing) {
+    if (G_UNLIKELY (set->flushing)) {
       /* we got woken up and we are flushing, we need to stop */
       errno = EBUSY;
       res = -1;
       break;
     }
-  } while (restarting);
+  } while (G_UNLIKELY (restarting));
 
-  set->waiting = FALSE;
+  set->waiting--;
 
   g_mutex_unlock (set->lock);
 
@@ -1367,6 +1427,7 @@
 #ifdef G_OS_WIN32
 winsock_error:
   {
+    set->waiting--;
     g_mutex_unlock (set->lock);
     return -1;
   }
@@ -1390,7 +1451,8 @@
 EXPORT_C
 #endif
 
-gboolean gst_poll_set_controllable (GstPoll * set, gboolean controllable)
+gboolean
+gst_poll_set_controllable (GstPoll * set, gboolean controllable)
 {
   g_return_val_if_fail (set != NULL, FALSE);
 
@@ -1403,6 +1465,7 @@
     if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
       goto no_socket_pair;
 */
+    pipe(control_sock);
     fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
     fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
 
@@ -1418,7 +1481,7 @@
 
   /* delay the change of the controllable state if we are waiting */
   set->new_controllable = controllable;
-  if (!set->waiting)
+  if (set->waiting == 0)
     set->controllable = controllable;
 
   g_mutex_unlock (set->lock);
@@ -1450,17 +1513,20 @@
 EXPORT_C
 #endif
 
-void gst_poll_restart (GstPoll * set)
+void
+gst_poll_restart (GstPoll * set)
 {
   g_return_if_fail (set != NULL);
 
   g_mutex_lock (set->lock);
 
-  if (set->controllable && set->waiting) {
+  if (set->controllable && set->waiting > 0) {
 #ifndef G_OS_WIN32
+    gint result;
+
     /* if we are waiting, we can send the command, else we do not have to
      * bother, future calls will automatically pick up the new fdset */
-    SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
+    SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
 #else
     SetEvent (set->wakeup_event);
 #endif
@@ -1485,7 +1551,8 @@
 EXPORT_C
 #endif
 
-void gst_poll_set_flushing (GstPoll * set, gboolean flushing)
+void
+gst_poll_set_flushing (GstPoll * set, gboolean flushing)
 {
   g_return_if_fail (set != NULL);
 
@@ -1494,12 +1561,14 @@
   /* update the new state first */
   set->flushing = flushing;
 
-  if (flushing && set->controllable && set->waiting) {
+  if (flushing && set->controllable && set->waiting > 0) {
     /* we are flushing, controllable and waiting, wake up the waiter. When we
      * stop the flushing operation we don't clear the wakeup fd here, this will
      * happen in the _wait() thread. */
 #ifndef G_OS_WIN32
-    SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
+    gint result;
+
+    SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
 #else
     SetEvent (set->wakeup_event);
 #endif
@@ -1507,3 +1576,88 @@
 
   g_mutex_unlock (set->lock);
 }
+
+/**
+ * gst_poll_write_control:
+ * @set: a #GstPoll.
+ *
+ * Write a byte to the control socket of the controllable @set.
+ * This function is mostly useful for timer #GstPoll objects created with
+ * gst_poll_new_timer(). 
+ *
+ * It will make any current and future gst_poll_wait() function return with
+ * 1, meaning the control socket is set. After an equal amount of calls to
+ * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
+ * block again until their timeout expired.
+ *
+ * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
+ * byte could not be written.
+ *
+ * Since: 0.10.23
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean
+gst_poll_write_control (GstPoll * set)
+{
+  gboolean res = FALSE;
+
+  g_return_val_if_fail (set != NULL, FALSE);
+
+  g_mutex_lock (set->lock);
+  if (set->controllable) {
+#ifndef G_OS_WIN32
+    gint result;
+
+    SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
+    res = (result > 0);
+#else
+    res = SetEvent (set->wakeup_event);
+#endif
+  }
+  g_mutex_unlock (set->lock);
+
+  return res;
+}
+
+/**
+ * gst_poll_read_control:
+ * @set: a #GstPoll.
+ *
+ * Read a byte from the control socket of the controllable @set.
+ * This function is mostly useful for timer #GstPoll objects created with
+ * gst_poll_new_timer(). 
+ *
+ * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
+ * was no byte to read.
+ *
+ * Since: 0.10.23
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean
+gst_poll_read_control (GstPoll * set)
+{
+  gboolean res = FALSE;
+
+  g_return_val_if_fail (set != NULL, FALSE);
+
+  g_mutex_lock (set->lock);
+  if (set->controllable) {
+#ifndef G_OS_WIN32
+    guchar cmd;
+    gint result;
+    READ_COMMAND (set, cmd, result);
+    res = (result > 0);
+#else
+    res = ResetEvent (set->wakeup_event);
+#endif
+  }
+  g_mutex_unlock (set->lock);
+
+  return res;
+}