--- a/gstreamer_core/gst/gstpoll.c Wed Mar 31 22:03:18 2010 +0300
+++ b/gstreamer_core/gst/gstpoll.c Tue Aug 31 15:30:33 2010 +0300
@@ -68,6 +68,8 @@
#include <sys/time.h>
#endif
+#include "gst_private.h"
+
#include <sys/types.h>
#ifdef HAVE_UNISTD_H
@@ -89,8 +91,6 @@
/* OS/X needs this because of bad headers */
#include <string.h>
-#include "gst_private.h"
-
#include "gstpoll.h"
#ifdef __SYMBIAN32__
@@ -112,15 +112,25 @@
/* the poll/select call is also performed on a control socket, that way
* we can send special commands to control it
*/
-#define SEND_COMMAND(set, command) \
+/* FIXME: Shouldn't we check or return the return value
+ * of write()?
+ */
+#define SEND_COMMAND(set, command, result) \
G_STMT_START { \
unsigned char c = command; \
- write (set->control_write_fd.fd, &c, 1); \
+ result = write (set->control_write_fd.fd, &c, 1); \
+ if (result > 0) \
+ set->control_pending++; \
} G_STMT_END
-#define READ_COMMAND(set, command, res) \
-G_STMT_START { \
- res = read (set->control_read_fd.fd, &command, 1); \
+#define READ_COMMAND(set, command, res) \
+G_STMT_START { \
+ if (set->control_pending > 0) { \
+ res = read (set->control_read_fd.fd, &command, 1); \
+ if (res == 1) \
+ set->control_pending--; \
+ } else \
+ res = 0; \
} G_STMT_END
#define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */
@@ -160,7 +170,6 @@
GstPollFD control_write_fd;
#else
GArray *active_fds_ignored;
-
GArray *events;
GArray *active_events;
@@ -169,8 +178,10 @@
gboolean controllable;
gboolean new_controllable;
- gboolean waiting;
+ guint waiting;
+ guint control_pending;
gboolean flushing;
+ gboolean timer;
};
static gint
@@ -298,7 +309,7 @@
FD_ZERO (writefds);
g_mutex_lock (set->lock);
-
+
for (i = 0; i < set->active_fds->len; i++) {
#ifndef __SYMBIAN32__
struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
@@ -316,8 +327,7 @@
}
}
- g_mutex_unlock (set->lock);
-
+ g_mutex_unlock (set->lock);
return max_fd;
}
@@ -502,7 +512,7 @@
{
GstPoll *nset;
- nset = g_new0 (GstPoll, 1);
+ nset = g_slice_new0 (GstPoll);
nset->lock = g_mutex_new ();
#ifndef G_OS_WIN32
nset->mode = GST_POLL_MODE_AUTO;
@@ -537,6 +547,40 @@
}
/**
+ * gst_poll_new_timer:
+ *
+ * Create a new poll object that can be used for scheduling cancellable
+ * timeouts.
+ *
+ * A timeout is performed with gst_poll_wait(). Multiple timeouts can be
+ * performed from different threads.
+ *
+ * Returns: a new #GstPoll, or %NULL in case of an error. Free with
+ * gst_poll_free().
+ *
+ * Since: 0.10.23
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+GstPoll *
+gst_poll_new_timer (void)
+{
+ GstPoll *poll;
+
+ /* make a new controllable poll set */
+ if (!(poll = gst_poll_new (TRUE)))
+ goto done;
+
+ /* we are a timer */
+ poll->timer = TRUE;
+
+done:
+ return poll;
+}
+
+/**
* gst_poll_free:
* @set: a file descriptor set.
*
@@ -575,7 +619,7 @@
g_array_free (set->active_fds, TRUE);
g_array_free (set->fds, TRUE);
g_mutex_free (set->lock);
- g_free (set);
+ g_slice_free (GstPoll, set);
}
/**
@@ -756,7 +800,8 @@
else
pfd->events &= ~POLLOUT;
#else
- gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active);
+ gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT,
+ active);
#endif
}
@@ -812,7 +857,9 @@
#ifdef __SYMBIAN32__
EXPORT_C
#endif
- gboolean gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
+
+gboolean
+gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
{
gboolean ret;
@@ -849,7 +896,8 @@
EXPORT_C
#endif
- void gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
+void
+gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
{
#ifdef G_OS_WIN32
gint idx;
@@ -886,7 +934,8 @@
EXPORT_C
#endif
- gboolean gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
+gboolean
+gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
{
gboolean res = FALSE;
gint idx;
@@ -959,7 +1008,8 @@
res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
(wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
(wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
- (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0);
+ (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) ||
+ (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0);
#endif
}
@@ -1006,7 +1056,8 @@
EXPORT_C
#endif
-gboolean gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
+gboolean
+gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
{
gboolean res = FALSE;
@@ -1038,7 +1089,8 @@
EXPORT_C
#endif
-gboolean gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
+gboolean
+gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
{
gboolean res = FALSE;
gint idx;
@@ -1113,8 +1165,13 @@
* Wait for activity on the file descriptors in @set. This function waits up to
* the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
*
- * When this function is called from multiple threads, -1 will be returned with
- * errno set to EPERM.
+ * For #GstPoll objects created with gst_poll_new(), this function can only be
+ * called from a single thread at a time. If called from multiple threads,
+ * -1 will be returned with errno set to EPERM.
+ *
+ * This is not true for timer #GstPoll objects created with
+ * gst_poll_new_timer(), where it is allowed to have multiple threads waiting
+ * simultaneously.
*
* Returns: The number of #GstPollFD in @set that have activity or 0 when no
* activity was detected after @timeout. If an error occurs, -1 is returned
@@ -1126,24 +1183,26 @@
EXPORT_C
#endif
-gint gst_poll_wait (GstPoll * set, GstClockTime timeout)
+gint
+gst_poll_wait (GstPoll * set, GstClockTime timeout)
{
gboolean restarting;
- int res = -1;
+ int res;
g_return_val_if_fail (set != NULL, -1);
g_mutex_lock (set->lock);
- /* we cannot wait from multiple threads */
- if (set->waiting)
+ /* we cannot wait from multiple threads unless we are a timer */
+ if (G_UNLIKELY (set->waiting > 0 && !set->timer))
goto already_waiting;
/* flushing, exit immediatly */
- if (set->flushing)
+ if (G_UNLIKELY (set->flushing))
goto flushing;
- set->waiting = TRUE;
+ /* add one more waiter */
+ set->waiting++;
do {
GstPollMode mode;
@@ -1240,7 +1299,7 @@
} else {
tvptr = NULL;
}
-//temporary fix for multifdsink
+
FD_ZERO(&excepfds);
if( max_fd != -1)
@@ -1332,20 +1391,21 @@
g_mutex_lock (set->lock);
- gst_poll_check_ctrl_commands (set, res, &restarting);
+ if (!set->timer)
+ gst_poll_check_ctrl_commands (set, res, &restarting);
/* update the controllable state if needed */
set->controllable = set->new_controllable;
- if (set->flushing) {
+ if (G_UNLIKELY (set->flushing)) {
/* we got woken up and we are flushing, we need to stop */
errno = EBUSY;
res = -1;
break;
}
- } while (restarting);
+ } while (G_UNLIKELY (restarting));
- set->waiting = FALSE;
+ set->waiting--;
g_mutex_unlock (set->lock);
@@ -1367,6 +1427,7 @@
#ifdef G_OS_WIN32
winsock_error:
{
+ set->waiting--;
g_mutex_unlock (set->lock);
return -1;
}
@@ -1390,7 +1451,8 @@
EXPORT_C
#endif
-gboolean gst_poll_set_controllable (GstPoll * set, gboolean controllable)
+gboolean
+gst_poll_set_controllable (GstPoll * set, gboolean controllable)
{
g_return_val_if_fail (set != NULL, FALSE);
@@ -1403,6 +1465,7 @@
if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
goto no_socket_pair;
*/
+ pipe(control_sock);
fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
@@ -1418,7 +1481,7 @@
/* delay the change of the controllable state if we are waiting */
set->new_controllable = controllable;
- if (!set->waiting)
+ if (set->waiting == 0)
set->controllable = controllable;
g_mutex_unlock (set->lock);
@@ -1450,17 +1513,20 @@
EXPORT_C
#endif
-void gst_poll_restart (GstPoll * set)
+void
+gst_poll_restart (GstPoll * set)
{
g_return_if_fail (set != NULL);
g_mutex_lock (set->lock);
- if (set->controllable && set->waiting) {
+ if (set->controllable && set->waiting > 0) {
#ifndef G_OS_WIN32
+ gint result;
+
/* if we are waiting, we can send the command, else we do not have to
* bother, future calls will automatically pick up the new fdset */
- SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
+ SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
#else
SetEvent (set->wakeup_event);
#endif
@@ -1485,7 +1551,8 @@
EXPORT_C
#endif
-void gst_poll_set_flushing (GstPoll * set, gboolean flushing)
+void
+gst_poll_set_flushing (GstPoll * set, gboolean flushing)
{
g_return_if_fail (set != NULL);
@@ -1494,12 +1561,14 @@
/* update the new state first */
set->flushing = flushing;
- if (flushing && set->controllable && set->waiting) {
+ if (flushing && set->controllable && set->waiting > 0) {
/* we are flushing, controllable and waiting, wake up the waiter. When we
* stop the flushing operation we don't clear the wakeup fd here, this will
* happen in the _wait() thread. */
#ifndef G_OS_WIN32
- SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
+ gint result;
+
+ SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
#else
SetEvent (set->wakeup_event);
#endif
@@ -1507,3 +1576,88 @@
g_mutex_unlock (set->lock);
}
+
+/**
+ * gst_poll_write_control:
+ * @set: a #GstPoll.
+ *
+ * Write a byte to the control socket of the controllable @set.
+ * This function is mostly useful for timer #GstPoll objects created with
+ * gst_poll_new_timer().
+ *
+ * It will make any current and future gst_poll_wait() function return with
+ * 1, meaning the control socket is set. After an equal amount of calls to
+ * gst_poll_read_control() have been performed, calls to gst_poll_wait() will
+ * block again until their timeout expired.
+ *
+ * Returns: %TRUE on success. %FALSE when @set is not controllable or when the
+ * byte could not be written.
+ *
+ * Since: 0.10.23
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean
+gst_poll_write_control (GstPoll * set)
+{
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+
+ g_mutex_lock (set->lock);
+ if (set->controllable) {
+#ifndef G_OS_WIN32
+ gint result;
+
+ SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result);
+ res = (result > 0);
+#else
+ res = SetEvent (set->wakeup_event);
+#endif
+ }
+ g_mutex_unlock (set->lock);
+
+ return res;
+}
+
+/**
+ * gst_poll_read_control:
+ * @set: a #GstPoll.
+ *
+ * Read a byte from the control socket of the controllable @set.
+ * This function is mostly useful for timer #GstPoll objects created with
+ * gst_poll_new_timer().
+ *
+ * Returns: %TRUE on success. %FALSE when @set is not controllable or when there
+ * was no byte to read.
+ *
+ * Since: 0.10.23
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean
+gst_poll_read_control (GstPoll * set)
+{
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+
+ g_mutex_lock (set->lock);
+ if (set->controllable) {
+#ifndef G_OS_WIN32
+ guchar cmd;
+ gint result;
+ READ_COMMAND (set, cmd, result);
+ res = (result > 0);
+#else
+ res = ResetEvent (set->wakeup_event);
+#endif
+ }
+ g_mutex_unlock (set->lock);
+
+ return res;
+}