--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/gstreamer_core/gst/gstpoll.c Thu Dec 17 08:53:32 2009 +0200
@@ -0,0 +1,1509 @@
+/* GStreamer
+ * Copyright (C) 1999 Erik Walthinsen <omega@cse.ogi.edu>
+ * Copyright (C) 2004 Wim Taymans <wim.taymans@gmail.com>
+ * Copyright (C) 2007 Peter Kjellerstedt <pkj@axis.com>
+ * Copyright (C) 2008 Ole André Vadla Ravnås <ole.andre.ravnas@tandberg.com>
+ *
+ * gstpoll.c: File descriptor set
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+/**
+ * SECTION:gstpoll
+ * @short_description: Keep track of file descriptors and make it possible
+ * to wait on them in a cancelable way
+ *
+ * A #GstPoll keeps track of file descriptors much like fd_set (used with
+ * select()) or a struct pollfd array (used with poll()). Once created with
+ * gst_poll_new(), the set can be used to wait for file descriptors to be
+ * readable and/or writeable. It is possible to make this wait be controlled
+ * by specifying %TRUE for the @controllable flag when creating the set (or
+ * later calling gst_poll_set_controllable()).
+ *
+ * New file descriptors are added to the set using gst_poll_add_fd(), and
+ * removed using gst_poll_remove_fd(). Controlling which file descriptors
+ * should be waited for to become readable and/
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+or writeable are done using
+ * gst_poll_fd_ctl_read() and gst_poll_fd_ctl_write().
+ *
+ * Use gst_poll_wait() to wait for the file descriptors to actually become
+ * readable and/or writeable, or to timeout if no file descriptor is available
+ * in time. The wait can be controlled by calling gst_poll_restart() and
+ * gst_poll_set_flushing().
+ *
+ * Once the file descriptor set has been waited for, one can use
+ * gst_poll_fd_has_closed() to see if the file descriptor has been closed,
+ * gst_poll_fd_has_error() to see if it has generated an error,
+ * gst_poll_fd_can_read() to see if it is possible to read from the file
+ * descriptor, and gst_poll_fd_can_write() to see if it is possible to
+ * write to it.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#ifndef _MSC_VER
+#define _GNU_SOURCE 1
+#ifndef __SYMBIAN32__
+#include <sys/poll.h>
+#endif
+#include <sys/time.h>
+#endif
+
+#include <sys/types.h>
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+#include <errno.h>
+#include <fcntl.h>
+
+#include <glib.h>
+
+#ifdef G_OS_WIN32
+#include <winsock2.h>
+#define EINPROGRESS WSAEINPROGRESS
+#else
+#include <sys/socket.h>
+#endif
+
+/* OS/X needs this because of bad headers */
+#include <string.h>
+
+#include "gst_private.h"
+
+#include "gstpoll.h"
+
+#ifdef __SYMBIAN32__
+#include <glib_global.h>
+#include <sys/select.h>
+#include <garray.h>
+#include <glibconfig.h>
+#include <gthread.h>
+
+#define POLLIN 1
+#define POLLOUT 4
+#define POLLPRI 2
+#define POLLHUP 6
+#define POLLERR 8
+#define POLLNVAL 32
+
+#endif
+#ifndef G_OS_WIN32
+/* the poll/select call is also performed on a control socket, that way
+ * we can send special commands to control it
+ */
+#define SEND_COMMAND(set, command) \
+G_STMT_START { \
+ unsigned char c = command; \
+ write (set->control_write_fd.fd, &c, 1); \
+} G_STMT_END
+
+#define READ_COMMAND(set, command, res) \
+G_STMT_START { \
+ res = read (set->control_read_fd.fd, &command, 1); \
+} G_STMT_END
+
+#define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */
+
+#else /* G_OS_WIN32 */
+typedef struct _WinsockFd WinsockFd;
+
+struct _WinsockFd
+{
+ gint fd;
+ glong event_mask;
+ WSANETWORKEVENTS events;
+ glong ignored_event_mask;
+};
+#endif
+
+typedef enum
+{
+ GST_POLL_MODE_AUTO,
+ GST_POLL_MODE_SELECT,
+ GST_POLL_MODE_PSELECT,
+ GST_POLL_MODE_POLL,
+ GST_POLL_MODE_PPOLL,
+ GST_POLL_MODE_WINDOWS
+} GstPollMode;
+
+struct _GstPoll
+{
+ GstPollMode mode;
+
+ GMutex *lock;
+
+ GArray *fds;
+ GArray *active_fds;
+#ifndef G_OS_WIN32
+ GstPollFD control_read_fd;
+ GstPollFD control_write_fd;
+#else
+ GArray *active_fds_ignored;
+
+ GArray *events;
+ GArray *active_events;
+
+ HANDLE wakeup_event;
+#endif
+
+ gboolean controllable;
+ gboolean new_controllable;
+ gboolean waiting;
+ gboolean flushing;
+};
+
+static gint
+find_index (GArray * array, GstPollFD * fd)
+{
+#ifndef G_OS_WIN32
+ //struct pollfd *ifd;
+GstPollFD *ifd;
+
+#else
+ WinsockFd *ifd;
+#endif
+ guint i;
+
+ /* start by assuming the index found in the fd is still valid */
+ if (fd->idx >= 0 && fd->idx < array->len) {
+#ifndef G_OS_WIN32
+#ifndef __SYMBIAN32__
+ ifd = &g_array_index (array, struct pollfd, fd->idx);
+#else
+ ifd = &g_array_index (array, GstPollFD, fd->idx);
+#endif
+#else
+ ifd = &g_array_index (array, WinsockFd, fd->idx);
+#endif
+
+ if (ifd->fd == fd->fd) {
+ return fd->idx;
+ }
+ }
+
+ /* the pollfd array has changed and we need to lookup the fd again */
+ for (i = 0; i < array->len; i++) {
+#ifndef G_OS_WIN32
+#ifndef __SYMBIAN32__
+ ifd = &g_array_index (array, struct pollfd, i);
+#else
+ ifd = &g_array_index (array, GstPollFD, i);
+#endif
+#else
+ ifd = &g_array_index (array, WinsockFd, i);
+#endif
+
+ if (ifd->fd == fd->fd) {
+ fd->idx = (gint) i;
+ return fd->idx;
+ }
+ }
+
+ fd->idx = -1;
+ return fd->idx;
+}
+
+#if !defined(HAVE_PPOLL) && defined(HAVE_POLL)
+/* check if all file descriptors will fit in an fd_set */
+static gboolean
+selectable_fds (const GstPoll * set)
+{
+ guint i;
+
+ for (i = 0; i < set->fds->len; i++) {
+ struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
+
+ if (pfd->fd >= FD_SETSIZE)
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+/* check if the timeout will convert to a timeout value used for poll()
+ * without a loss of precision
+ */
+static gboolean
+pollable_timeout (GstClockTime timeout)
+{
+ if (timeout == GST_CLOCK_TIME_NONE)
+ return TRUE;
+
+ /* not a nice multiple of milliseconds */
+ if (timeout % 1000000)
+ return FALSE;
+
+ return TRUE;
+}
+#endif
+
+static GstPollMode
+choose_mode (const GstPoll * set, GstClockTime timeout)
+{
+ GstPollMode mode;
+
+ if (set->mode == GST_POLL_MODE_AUTO) {
+#ifdef HAVE_PPOLL
+ mode = GST_POLL_MODE_PPOLL;
+#elif defined(HAVE_POLL)
+ if (!selectable_fds (set) || pollable_timeout (timeout)) {
+ mode = GST_POLL_MODE_POLL;
+ } else {
+#ifdef HAVE_PSELECT
+ mode = GST_POLL_MODE_PSELECT;
+#else
+ mode = GST_POLL_MODE_SELECT;
+#endif
+ }
+#elif defined(HAVE_PSELECT)
+ mode = GST_POLL_MODE_PSELECT;
+#else
+ mode = GST_POLL_MODE_SELECT;
+#endif
+ } else {
+ mode = set->mode;
+ }
+ return mode;
+}
+
+#ifndef G_OS_WIN32
+static gint
+pollfd_to_fd_set (GstPoll * set, fd_set * readfds, fd_set * writefds)
+{
+ gint max_fd = -1;
+ guint i;
+
+ FD_ZERO (readfds);
+ FD_ZERO (writefds);
+
+ g_mutex_lock (set->lock);
+
+ for (i = 0; i < set->active_fds->len; i++) {
+#ifndef __SYMBIAN32__
+ struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i);
+#else
+ GPollFD *pfd = &g_array_index (set->fds, GPollFD, i);
+#endif
+
+ if (pfd->fd < FD_SETSIZE) {
+ if (pfd->events & POLLIN)
+ FD_SET (pfd->fd, readfds);
+ if (pfd->events & POLLOUT)
+ FD_SET (pfd->fd, writefds);
+ if (pfd->fd > max_fd)
+ max_fd = pfd->fd;
+ }
+ }
+
+ g_mutex_unlock (set->lock);
+
+
+ return max_fd;
+}
+
+static void
+fd_set_to_pollfd (GstPoll * set, fd_set * readfds, fd_set * writefds)
+{
+ guint i;
+
+ g_mutex_lock (set->lock);
+
+ for (i = 0; i < set->active_fds->len; i++) {
+ //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, i);
+ GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, i);
+
+ if (pfd->fd < FD_SETSIZE) {
+ if (FD_ISSET (pfd->fd, readfds))
+ pfd->revents |= POLLIN;
+ if (FD_ISSET (pfd->fd, writefds))
+ pfd->revents |= POLLOUT;
+ }
+ }
+
+ g_mutex_unlock (set->lock);
+}
+#else /* G_OS_WIN32 */
+/*
+ * Translate errors thrown by the Winsock API used by GstPoll:
+ * WSAEventSelect, WSAWaitForMultipleEvents and WSAEnumNetworkEvents
+ */
+static gint
+gst_poll_winsock_error_to_errno (DWORD last_error)
+{
+ switch (last_error) {
+ case WSA_INVALID_HANDLE:
+ case WSAEINVAL:
+ case WSAENOTSOCK:
+ return EBADF;
+
+ case WSA_NOT_ENOUGH_MEMORY:
+ return ENOMEM;
+
+ /*
+ * Anything else, including:
+ * WSA_INVALID_PARAMETER, WSAEFAULT, WSAEINPROGRESS, WSAENETDOWN,
+ * WSANOTINITIALISED
+ */
+ default:
+ return EINVAL;
+ }
+}
+
+static void
+gst_poll_free_winsock_event (GstPoll * set, gint idx)
+{
+ WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
+ HANDLE event = g_array_index (set->events, HANDLE, idx);
+
+ WSAEventSelect (wfd->fd, event, 0);
+ CloseHandle (event);
+}
+
+static void
+gst_poll_update_winsock_event_mask (GstPoll * set, gint idx, glong flags,
+ gboolean active)
+{
+ WinsockFd *wfd;
+
+ wfd = &g_array_index (set->fds, WinsockFd, idx);
+
+ if (active)
+ wfd->event_mask |= flags;
+ else
+ wfd->event_mask &= ~flags;
+
+ /* reset ignored state if the new mask doesn't overlap at all */
+ if ((wfd->ignored_event_mask & wfd->event_mask) == 0)
+ wfd->ignored_event_mask = 0;
+}
+
+static gboolean
+gst_poll_prepare_winsock_active_sets (GstPoll * set)
+{
+ guint i;
+
+ g_array_set_size (set->active_fds, 0);
+ g_array_set_size (set->active_fds_ignored, 0);
+ g_array_set_size (set->active_events, 0);
+ g_array_append_val (set->active_events, set->wakeup_event);
+
+ for (i = 0; i < set->fds->len; i++) {
+ WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, i);
+ HANDLE event = g_array_index (set->events, HANDLE, i);
+
+ if (wfd->ignored_event_mask == 0) {
+ gint ret;
+
+ g_array_append_val (set->active_fds, *wfd);
+ g_array_append_val (set->active_events, event);
+
+ ret = WSAEventSelect (wfd->fd, event, wfd->event_mask);
+ if (G_UNLIKELY (ret != 0)) {
+ errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
+ return FALSE;
+ }
+ } else {
+ g_array_append_val (set->active_fds_ignored, wfd);
+ }
+ }
+
+ return TRUE;
+}
+
+static gint
+gst_poll_collect_winsock_events (GstPoll * set)
+{
+ gint res, i;
+
+ /*
+ * We need to check which events are signaled, and call
+ * WSAEnumNetworkEvents for those that are, which resets
+ * the event and clears the internal network event records.
+ */
+ res = 0;
+ for (i = 0; i < set->active_fds->len; i++) {
+ WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, i);
+ HANDLE event = g_array_index (set->active_events, HANDLE, i + 1);
+ DWORD wait_ret;
+
+ wait_ret = WaitForSingleObject (event, 0);
+ if (wait_ret == WAIT_OBJECT_0) {
+ gint enum_ret = WSAEnumNetworkEvents (wfd->fd, event, &wfd->events);
+
+ if (G_UNLIKELY (enum_ret != 0)) {
+ res = -1;
+ errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
+ break;
+ }
+
+ res++;
+ } else {
+ /* clear any previously stored result */
+ memset (&wfd->events, 0, sizeof (wfd->events));
+ }
+ }
+
+ /* If all went well we also need to reset the ignored fds. */
+ if (res >= 0) {
+ res += set->active_fds_ignored->len;
+
+ for (i = 0; i < set->active_fds_ignored->len; i++) {
+ WinsockFd *wfd = g_array_index (set->active_fds_ignored, WinsockFd *, i);
+
+ wfd->ignored_event_mask = 0;
+ }
+
+ g_array_set_size (set->active_fds_ignored, 0);
+ }
+
+ return res;
+}
+#endif
+
+/**
+ * gst_poll_new:
+ * @controllable: whether it should be possible to control a wait.
+ *
+ * Create a new file descriptor set. If @controllable, it
+ * is possible to restart or flush a call to gst_poll_wait() with
+ * gst_poll_restart() and gst_poll_set_flushing() respectively.
+ *
+ * Returns: a new #GstPoll, or %NULL in case of an error. Free with
+ * gst_poll_free().
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+ GstPoll *gst_poll_new (gboolean controllable)
+{
+ GstPoll *nset;
+
+ nset = g_new0 (GstPoll, 1);
+ nset->lock = g_mutex_new ();
+#ifndef G_OS_WIN32
+ nset->mode = GST_POLL_MODE_AUTO;
+ //rj nset->fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
+ //rj nset->active_fds = g_array_new (FALSE, FALSE, sizeof (struct pollfd));
+ nset->fds = g_array_new (FALSE, FALSE, sizeof (GstPollFD));
+ nset->active_fds = g_array_new (FALSE, FALSE, sizeof (GstPollFD));
+ nset->control_read_fd.fd = -1;
+ nset->control_write_fd.fd = -1;
+#else
+ nset->mode = GST_POLL_MODE_WINDOWS;
+ nset->fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
+ nset->active_fds = g_array_new (FALSE, FALSE, sizeof (WinsockFd));
+ nset->active_fds_ignored = g_array_new (FALSE, FALSE, sizeof (WinsockFd *));
+ nset->events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
+ nset->active_events = g_array_new (FALSE, FALSE, sizeof (HANDLE));
+
+ nset->wakeup_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+#endif
+
+ if (!gst_poll_set_controllable (nset, controllable))
+ goto not_controllable;
+
+ return nset;
+
+ /* ERRORS */
+not_controllable:
+ {
+ gst_poll_free (nset);
+ return NULL;
+ }
+}
+
+/**
+ * gst_poll_free:
+ * @set: a file descriptor set.
+ *
+ * Free a file descriptor set.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+void gst_poll_free (GstPoll * set)
+{
+ g_return_if_fail (set != NULL);
+
+#ifndef G_OS_WIN32
+ if (set->control_write_fd.fd >= 0)
+ close (set->control_write_fd.fd);
+ if (set->control_read_fd.fd >= 0)
+ close (set->control_read_fd.fd);
+#else
+ CloseHandle (set->wakeup_event);
+
+ {
+ guint i;
+
+ for (i = 0; i < set->events->len; i++)
+ gst_poll_free_winsock_event (set, i);
+ }
+
+ g_array_free (set->active_events, TRUE);
+ g_array_free (set->events, TRUE);
+ g_array_free (set->active_fds_ignored, TRUE);
+#endif
+
+ g_array_free (set->active_fds, TRUE);
+ g_array_free (set->fds, TRUE);
+ g_mutex_free (set->lock);
+ g_free (set);
+}
+
+/**
+ * gst_poll_fd_init:
+ * @fd: a #GstPollFD
+ *
+ * Initializes @fd. Alternatively you can initialize it with
+ * #GST_POLL_FD_INIT.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+ void gst_poll_fd_init (GstPollFD * fd)
+{
+ g_return_if_fail (fd != NULL);
+
+ fd->fd = -1;
+ fd->idx = -1;
+}
+
+static gboolean
+gst_poll_add_fd_unlocked (GstPoll * set, GstPollFD * fd)
+{
+ gint idx;
+
+ idx = find_index (set->fds, fd);
+ if (idx < 0) {
+#ifndef G_OS_WIN32
+ //rj struct pollfd nfd;
+ GPollFD nfd;
+
+ nfd.fd = fd->fd;
+#ifndef __SYMBIAN32__
+ nfd.events = POLLERR | POLLNVAL | POLLHUP;
+#else
+ nfd.events = POLLERR | POLLNVAL ;
+#endif
+ nfd.revents = 0;
+
+ g_array_append_val (set->fds, nfd);
+
+ fd->idx = set->fds->len - 1;
+#else
+ WinsockFd wfd;
+ HANDLE event;
+
+ wfd.fd = fd->fd;
+ wfd.event_mask = FD_CLOSE;
+ memset (&wfd.events, 0, sizeof (wfd.events));
+ wfd.ignored_event_mask = 0;
+ event = WSACreateEvent ();
+
+ g_array_append_val (set->fds, wfd);
+ g_array_append_val (set->events, event);
+
+ fd->idx = set->fds->len - 1;
+#endif
+ }
+
+ return TRUE;
+}
+
+/**
+ * gst_poll_add_fd:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Add a file descriptor to the file descriptor set.
+ *
+ * Returns: %TRUE if the file descriptor was successfully added to the set.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+ gboolean gst_poll_add_fd (GstPoll * set, GstPollFD * fd)
+{
+ gboolean ret;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ ret = gst_poll_add_fd_unlocked (set, fd);
+
+ g_mutex_unlock (set->lock);
+
+ return ret;
+}
+
+/**
+ * gst_poll_remove_fd:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Remove a file descriptor from the file descriptor set.
+ *
+ * Returns: %TRUE if the file descriptor was successfully removed from the set.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean gst_poll_remove_fd (GstPoll * set, GstPollFD * fd)
+{
+ gint idx;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ /* get the index, -1 is an fd that is not added */
+ idx = find_index (set->fds, fd);
+ if (idx >= 0) {
+#ifdef G_OS_WIN32
+ gst_poll_free_winsock_event (set, idx);
+ g_array_remove_index_fast (set->events, idx);
+#endif
+
+ /* remove the fd at index, we use _remove_index_fast, which copies the last
+ * element of the array to the freed index */
+ g_array_remove_index_fast (set->fds, idx);
+
+ /* mark fd as removed by setting the index to -1 */
+ fd->idx = -1;
+ }
+
+ g_mutex_unlock (set->lock);
+
+ return idx >= 0;
+}
+
+/**
+ * gst_poll_fd_ctl_write:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ * @active: a new status.
+ *
+ * Control whether the descriptor @fd in @set will be monitored for
+ * writability.
+ *
+ * Returns: %TRUE if the descriptor was successfully updated.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean gst_poll_fd_ctl_write (GstPoll * set, GstPollFD * fd, gboolean active)
+{
+ gint idx;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ idx = find_index (set->fds, fd);
+ if (idx >= 0) {
+#ifndef G_OS_WIN32
+ // rj struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
+ GPollFD *pfd = &g_array_index (set->fds, GPollFD, idx);
+ if (active)
+ pfd->events |= POLLOUT;
+ else
+ pfd->events &= ~POLLOUT;
+#else
+ gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active);
+#endif
+ }
+
+ g_mutex_unlock (set->lock);
+
+ return idx >= 0;
+}
+
+static gboolean
+gst_poll_fd_ctl_read_unlocked (GstPoll * set, GstPollFD * fd, gboolean active)
+{
+ gint idx;
+
+ idx = find_index (set->fds, fd);
+
+ if (idx >= 0) {
+#ifndef G_OS_WIN32
+ //rj struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, idx);
+ GPollFD *pfd = &g_array_index (set->fds, GPollFD, idx);
+
+
+ if (active)
+ {
+#ifndef __SYMBIAN32__
+ pfd->events |= (POLLIN | POLLPRI);
+#else
+ pfd->events |= (POLLIN );
+#endif
+ }
+ else
+ pfd->events &= ~(POLLIN | POLLPRI);
+#else
+ gst_poll_update_winsock_event_mask (set, idx, FD_READ | FD_ACCEPT, active);
+#endif
+ }
+
+ return idx >= 0;
+}
+
+/**
+ * gst_poll_fd_ctl_read:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ * @active: a new status.
+ *
+ * Control whether the descriptor @fd in @set will be monitored for
+ * readability.
+ *
+ * Returns: %TRUE if the descriptor was successfully updated.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+ gboolean gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active)
+{
+ gboolean ret;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ ret = gst_poll_fd_ctl_read_unlocked (set, fd, active);
+
+ g_mutex_unlock (set->lock);
+
+ return ret;
+}
+
+/**
+ * gst_poll_fd_ignored:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Mark @fd as ignored so that the next call to gst_poll_wait() will yield
+ * the same result for @fd as last time. This function must be called if no
+ * operation (read/write/recv/send/etc.) will be performed on @fd before
+ * the next call to gst_poll_wait().
+ *
+ * The reason why this is needed is because the underlying implementation
+ * might not allow querying the fd more than once between calls to one of
+ * the re-enabling operations.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+ void gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd)
+{
+#ifdef G_OS_WIN32
+ gint idx;
+
+ g_return_if_fail (set != NULL);
+ g_return_if_fail (fd != NULL);
+ g_return_if_fail (fd->fd >= 0);
+
+ g_mutex_lock (set->lock);
+
+ idx = find_index (set->fds, fd);
+ if (idx >= 0) {
+ WinsockFd *wfd = &g_array_index (set->fds, WinsockFd, idx);
+
+ wfd->ignored_event_mask = wfd->event_mask & (FD_READ | FD_WRITE);
+ }
+
+ g_mutex_unlock (set->lock);
+#endif
+}
+
+/**
+ * gst_poll_fd_has_closed:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Check if @fd in @set has closed the connection.
+ *
+ * Returns: %TRUE if the connection was closed.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+ gboolean gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd)
+{
+ gboolean res = FALSE;
+ gint idx;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ idx = find_index (set->active_fds, fd);
+ if (idx >= 0) {
+#ifndef G_OS_WIN32
+ //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
+ GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx);
+ #ifndef __SYMBIAN32__
+ res = (pfd->revents & POLLHUP) != 0;
+ #else
+ res = (pfd->revents & POLLHUP) == POLLHUP;
+ #endif
+
+#else
+ WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
+
+ res = (wfd->events.lNetworkEvents & FD_CLOSE) != 0;
+#endif
+ }
+
+ g_mutex_unlock (set->lock);
+
+ return res;
+}
+
+/**
+ * gst_poll_fd_has_error:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Check if @fd in @set has an error.
+ *
+ * Returns: %TRUE if the descriptor has an error.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean gst_poll_fd_has_error (const GstPoll * set, GstPollFD * fd)
+{
+ gboolean res = FALSE;
+ gint idx;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ idx = find_index (set->active_fds, fd);
+ if (idx >= 0) {
+#ifndef G_OS_WIN32
+ //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
+ GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx);
+
+ res = (pfd->revents & (POLLERR | POLLNVAL)) != 0;
+#else
+ WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
+
+ res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) ||
+ (wfd->events.iErrorCode[FD_READ_BIT] != 0) ||
+ (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) ||
+ (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0);
+#endif
+ }
+
+ g_mutex_unlock (set->lock);
+
+ return res;
+}
+
+static gboolean
+gst_poll_fd_can_read_unlocked (const GstPoll * set, GstPollFD * fd)
+{
+ gboolean res = FALSE;
+ gint idx;
+
+ idx = find_index (set->active_fds, fd);
+ if (idx >= 0) {
+#ifndef G_OS_WIN32
+ //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
+ GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx);
+
+ res = (pfd->revents & (POLLIN | POLLPRI)) != 0;
+#else
+ WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
+
+ res = (wfd->events.lNetworkEvents & (FD_READ | FD_ACCEPT)) != 0;
+#endif
+ }
+
+ return res;
+}
+
+/**
+ * gst_poll_fd_can_read:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Check if @fd in @set has data to be read.
+ *
+ * Returns: %TRUE if the descriptor has data to be read.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd)
+{
+ gboolean res = FALSE;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ res = gst_poll_fd_can_read_unlocked (set, fd);
+
+ g_mutex_unlock (set->lock);
+
+ return res;
+}
+
+/**
+ * gst_poll_fd_can_write:
+ * @set: a file descriptor set.
+ * @fd: a file descriptor.
+ *
+ * Check if @fd in @set can be used for writing.
+ *
+ * Returns: %TRUE if the descriptor can be used for writing.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd)
+{
+ gboolean res = FALSE;
+ gint idx;
+
+ g_return_val_if_fail (set != NULL, FALSE);
+ g_return_val_if_fail (fd != NULL, FALSE);
+ g_return_val_if_fail (fd->fd >= 0, FALSE);
+
+ g_mutex_lock (set->lock);
+
+ idx = find_index (set->active_fds, fd);
+ if (idx >= 0) {
+#ifndef G_OS_WIN32
+ //rj struct pollfd *pfd = &g_array_index (set->active_fds, struct pollfd, idx);
+ GPollFD *pfd = &g_array_index (set->active_fds, GPollFD, idx);
+
+ //rj res = (pfd->revents & POLLOUT) != 0;
+ res = (pfd->revents & POLLOUT) != 0;
+#else
+ WinsockFd *wfd = &g_array_index (set->active_fds, WinsockFd, idx);
+
+ res = (wfd->events.lNetworkEvents & FD_WRITE) != 0;
+#endif
+ }
+
+ g_mutex_unlock (set->lock);
+
+ return res;
+}
+
+static void
+gst_poll_check_ctrl_commands (GstPoll * set, gint res, gboolean * restarting)
+{
+ /* check if the poll/select was aborted due to a command */
+ if (set->controllable) {
+#ifndef G_OS_WIN32
+ while (TRUE) {
+ guchar cmd;
+ gint result;
+
+ /* we do not check the read status of the control socket here because
+ * there may have been a write to the socket between the time the
+ * poll/select finished and before we got the mutex back, and we need
+ * to clear out the control socket before leaving */
+ READ_COMMAND (set, cmd, result);
+ if (result <= 0) {
+ /* no more commands, quit the loop */
+ break;
+ }
+
+ /* if the control socket is the only socket with activity when we get
+ * here, we restart the _wait operation, else we allow the caller to
+ * process the other file descriptors */
+ if (res == 1 &&
+ gst_poll_fd_can_read_unlocked (set, &set->control_read_fd))
+ *restarting = TRUE;
+ }
+#else
+ if (WaitForSingleObject (set->wakeup_event, 0) == WAIT_OBJECT_0) {
+ ResetEvent (set->wakeup_event);
+ *restarting = TRUE;
+ }
+#endif
+ }
+}
+
+/**
+ * gst_poll_wait:
+ * @set: a #GstPoll.
+ * @timeout: a timeout in nanoseconds.
+ *
+ * Wait for activity on the file descriptors in @set. This function waits up to
+ * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever.
+ *
+ * When this function is called from multiple threads, -1 will be returned with
+ * errno set to EPERM.
+ *
+ * Returns: The number of #GstPollFD in @set that have activity or 0 when no
+ * activity was detected after @timeout. If an error occurs, -1 is returned
+ * and errno is set.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gint gst_poll_wait (GstPoll * set, GstClockTime timeout)
+{
+ gboolean restarting;
+ int res = -1;
+
+ g_return_val_if_fail (set != NULL, -1);
+
+ g_mutex_lock (set->lock);
+
+ /* we cannot wait from multiple threads */
+ if (set->waiting)
+ goto already_waiting;
+
+ /* flushing, exit immediatly */
+ if (set->flushing)
+ goto flushing;
+
+ set->waiting = TRUE;
+
+ do {
+ GstPollMode mode;
+
+ res = -1;
+ restarting = FALSE;
+
+ mode = choose_mode (set, timeout);
+
+#ifndef G_OS_WIN32
+ g_array_set_size (set->active_fds, set->fds->len);
+ //rj memcpy (set->active_fds->data, set->fds->data,
+ // set->fds->len * sizeof (struct pollfd));
+ memcpy (set->active_fds->data, set->fds->data,
+ set->fds->len * sizeof (GstPollFD));
+#else
+ if (!gst_poll_prepare_winsock_active_sets (set))
+ goto winsock_error;
+#endif
+
+ g_mutex_unlock (set->lock);
+
+ switch (mode) {
+ case GST_POLL_MODE_AUTO:
+ g_assert_not_reached ();
+ break;
+ case GST_POLL_MODE_PPOLL:
+ {
+#ifdef HAVE_PPOLL
+ struct timespec ts;
+ struct timespec *tsptr;
+
+ if (timeout != GST_CLOCK_TIME_NONE) {
+ GST_TIME_TO_TIMESPEC (timeout, ts);
+ tsptr = &ts;
+ } else {
+ tsptr = NULL;
+ }
+
+ res =
+ ppoll ((struct pollfd *) set->active_fds->data,
+ set->active_fds->len, tsptr, NULL);
+#else
+ g_assert_not_reached ();
+ errno = ENOSYS;
+#endif
+ break;
+ }
+ case GST_POLL_MODE_POLL:
+ {
+#ifdef HAVE_POLL
+ gint t;
+
+ if (timeout != GST_CLOCK_TIME_NONE) {
+ t = GST_TIME_AS_MSECONDS (timeout);
+ } else {
+ t = -1;
+ }
+
+ res =
+ poll ((struct pollfd *) set->active_fds->data,
+ set->active_fds->len, t);
+#else
+ g_assert_not_reached ();
+ errno = ENOSYS;
+#endif
+ break;
+ }
+ case GST_POLL_MODE_PSELECT:
+#ifndef HAVE_PSELECT
+ {
+ g_assert_not_reached ();
+ errno = ENOSYS;
+ break;
+ }
+#endif
+ case GST_POLL_MODE_SELECT:
+ {
+#ifndef G_OS_WIN32
+ fd_set readfds;
+ fd_set writefds;
+ fd_set excepfds;
+ gint max_fd;
+
+ max_fd = pollfd_to_fd_set (set, &readfds, &writefds);
+
+ if (mode == GST_POLL_MODE_SELECT) {
+ struct timeval tv;
+ struct timeval *tvptr;
+
+ if (timeout != GST_CLOCK_TIME_NONE) {
+ GST_TIME_TO_TIMEVAL (timeout, tv);
+ tvptr = &tv;
+ } else {
+ tvptr = NULL;
+ }
+//temporary fix for multifdsink
+ FD_ZERO(&excepfds);
+
+ if( max_fd != -1)
+ {
+ if( ! FD_ISSET(max_fd,&readfds))
+ {
+ if( ! FD_ISSET(max_fd,&writefds))
+ {
+ FD_SET(max_fd,&excepfds);
+ }
+ }
+ }
+
+ /*
+ if( max_fd != -1)
+ FD_SET(max_fd,&excepfds);
+ */
+#ifdef __SYMBIAN32__
+ if( max_fd == -1 && !tvptr);
+ else
+ res = select (max_fd + 1, &readfds, &writefds, &excepfds, tvptr);
+#else
+ res = select (max_fd + 1, &readfds, &writefds, NULL, tvptr);
+#endif
+ } else {
+#ifdef HAVE_PSELECT
+ struct timespec ts;
+ struct timespec *tsptr;
+
+ if (timeout != GST_CLOCK_TIME_NONE) {
+ GST_TIME_TO_TIMESPEC (timeout, ts);
+ tsptr = &ts;
+ } else {
+ tsptr = NULL;
+ }
+
+ res = pselect (max_fd + 1, &readfds, &writefds, NULL, tsptr, NULL);
+#endif
+ }
+
+ if (res > 0) {
+ fd_set_to_pollfd (set, &readfds, &writefds);
+ }
+#else /* G_OS_WIN32 */
+ g_assert_not_reached ();
+ errno = ENOSYS;
+#endif
+ break;
+ }
+ case GST_POLL_MODE_WINDOWS:
+ {
+#ifdef G_OS_WIN32
+ gint ignore_count = set->active_fds_ignored->len;
+ DWORD t, wait_ret;
+
+ if (G_LIKELY (ignore_count == 0)) {
+ if (timeout != GST_CLOCK_TIME_NONE)
+ t = GST_TIME_AS_MSECONDS (timeout);
+ else
+ t = INFINITE;
+ } else {
+ /* already one or more ignored fds, so we quickly sweep the others */
+ t = 0;
+ }
+
+ wait_ret = WSAWaitForMultipleEvents (set->active_events->len,
+ (HANDLE *) set->active_events->data, FALSE, t, FALSE);
+
+ if (ignore_count == 0 && wait_ret == WSA_WAIT_TIMEOUT) {
+ res = 0;
+ } else if (wait_ret == WSA_WAIT_FAILED) {
+ res = -1;
+ errno = gst_poll_winsock_error_to_errno (WSAGetLastError ());
+ } else {
+ /* the first entry is the wakeup event */
+ if (wait_ret - WSA_WAIT_EVENT_0 >= 1) {
+ res = gst_poll_collect_winsock_events (set);
+ } else {
+ res = 1; /* wakeup event */
+ }
+ }
+#else
+ g_assert_not_reached ();
+ errno = ENOSYS;
+#endif
+ break;
+ }
+ }
+
+ g_mutex_lock (set->lock);
+
+ gst_poll_check_ctrl_commands (set, res, &restarting);
+
+ /* update the controllable state if needed */
+ set->controllable = set->new_controllable;
+
+ if (set->flushing) {
+ /* we got woken up and we are flushing, we need to stop */
+ errno = EBUSY;
+ res = -1;
+ break;
+ }
+ } while (restarting);
+
+ set->waiting = FALSE;
+
+ g_mutex_unlock (set->lock);
+
+ return res;
+
+ /* ERRORS */
+already_waiting:
+ {
+ g_mutex_unlock (set->lock);
+ errno = EPERM;
+ return -1;
+ }
+flushing:
+ {
+ g_mutex_unlock (set->lock);
+ errno = EBUSY;
+ return -1;
+ }
+#ifdef G_OS_WIN32
+winsock_error:
+ {
+ g_mutex_unlock (set->lock);
+ return -1;
+ }
+#endif
+}
+
+/**
+ * gst_poll_set_controllable:
+ * @set: a #GstPoll.
+ * @controllable: new controllable state.
+ *
+ * When @controllable is %TRUE, this function ensures that future calls to
+ * gst_poll_wait() will be affected by gst_poll_restart() and
+ * gst_poll_set_flushing().
+ *
+ * Returns: %TRUE if the controllability of @set could be updated.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gboolean gst_poll_set_controllable (GstPoll * set, gboolean controllable)
+{
+ g_return_val_if_fail (set != NULL, FALSE);
+
+ g_mutex_lock (set->lock);
+
+#ifndef G_OS_WIN32
+ if (controllable && set->control_read_fd.fd < 0) {
+ gint control_sock[2];
+/* rj
+ if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
+ goto no_socket_pair;
+*/
+ fcntl (control_sock[0], F_SETFL, O_NONBLOCK);
+ fcntl (control_sock[1], F_SETFL, O_NONBLOCK);
+
+ set->control_read_fd.fd = control_sock[0];
+ set->control_write_fd.fd = control_sock[1];
+
+ gst_poll_add_fd_unlocked (set, &set->control_read_fd);
+ }
+
+ if (set->control_read_fd.fd >= 0)
+ gst_poll_fd_ctl_read_unlocked (set, &set->control_read_fd, controllable);
+#endif
+
+ /* delay the change of the controllable state if we are waiting */
+ set->new_controllable = controllable;
+ if (!set->waiting)
+ set->controllable = controllable;
+
+ g_mutex_unlock (set->lock);
+
+ return TRUE;
+
+ /* ERRORS */
+#ifndef G_OS_WIN32
+no_socket_pair:
+ {
+ g_mutex_unlock (set->lock);
+ return FALSE;
+ }
+#endif
+}
+
+/**
+ * gst_poll_restart:
+ * @set: a #GstPoll.
+ *
+ * Restart any gst_poll_wait() that is in progress. This function is typically
+ * used after adding or removing descriptors to @set.
+ *
+ * If @set is not controllable, then this call will have no effect.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+void gst_poll_restart (GstPoll * set)
+{
+ g_return_if_fail (set != NULL);
+
+ g_mutex_lock (set->lock);
+
+ if (set->controllable && set->waiting) {
+#ifndef G_OS_WIN32
+ /* if we are waiting, we can send the command, else we do not have to
+ * bother, future calls will automatically pick up the new fdset */
+ SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
+#else
+ SetEvent (set->wakeup_event);
+#endif
+ }
+
+ g_mutex_unlock (set->lock);
+}
+
+/**
+ * gst_poll_set_flushing:
+ * @set: a #GstPoll.
+ * @flushing: new flushing state.
+ *
+ * When @flushing is %TRUE, this function ensures that current and future calls
+ * to gst_poll_wait() will return -1, with errno set to EBUSY.
+ *
+ * Unsetting the flushing state will restore normal operation of @set.
+ *
+ * Since: 0.10.18
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+void gst_poll_set_flushing (GstPoll * set, gboolean flushing)
+{
+ g_return_if_fail (set != NULL);
+
+ g_mutex_lock (set->lock);
+
+ /* update the new state first */
+ set->flushing = flushing;
+
+ if (flushing && set->controllable && set->waiting) {
+ /* we are flushing, controllable and waiting, wake up the waiter. When we
+ * stop the flushing operation we don't clear the wakeup fd here, this will
+ * happen in the _wait() thread. */
+#ifndef G_OS_WIN32
+ SEND_COMMAND (set, GST_POLL_CMD_WAKEUP);
+#else
+ SetEvent (set->wakeup_event);
+#endif
+ }
+
+ g_mutex_unlock (set->lock);
+}