diff -r 9b2c3c7a1a9c -r 567bb019e3e3 gstreamer_core/gst/gstpoll.c --- a/gstreamer_core/gst/gstpoll.c Wed Mar 31 22:03:18 2010 +0300 +++ b/gstreamer_core/gst/gstpoll.c Tue Aug 31 15:30:33 2010 +0300 @@ -68,6 +68,8 @@ #include #endif +#include "gst_private.h" + #include #ifdef HAVE_UNISTD_H @@ -89,8 +91,6 @@ /* OS/X needs this because of bad headers */ #include -#include "gst_private.h" - #include "gstpoll.h" #ifdef __SYMBIAN32__ @@ -112,15 +112,25 @@ /* the poll/select call is also performed on a control socket, that way * we can send special commands to control it */ -#define SEND_COMMAND(set, command) \ +/* FIXME: Shouldn't we check or return the return value + * of write()? + */ +#define SEND_COMMAND(set, command, result) \ G_STMT_START { \ unsigned char c = command; \ - write (set->control_write_fd.fd, &c, 1); \ + result = write (set->control_write_fd.fd, &c, 1); \ + if (result > 0) \ + set->control_pending++; \ } G_STMT_END -#define READ_COMMAND(set, command, res) \ -G_STMT_START { \ - res = read (set->control_read_fd.fd, &command, 1); \ +#define READ_COMMAND(set, command, res) \ +G_STMT_START { \ + if (set->control_pending > 0) { \ + res = read (set->control_read_fd.fd, &command, 1); \ + if (res == 1) \ + set->control_pending--; \ + } else \ + res = 0; \ } G_STMT_END #define GST_POLL_CMD_WAKEUP 'W' /* restart the poll/select call */ @@ -160,7 +170,6 @@ GstPollFD control_write_fd; #else GArray *active_fds_ignored; - GArray *events; GArray *active_events; @@ -169,8 +178,10 @@ gboolean controllable; gboolean new_controllable; - gboolean waiting; + guint waiting; + guint control_pending; gboolean flushing; + gboolean timer; }; static gint @@ -298,7 +309,7 @@ FD_ZERO (writefds); g_mutex_lock (set->lock); - + for (i = 0; i < set->active_fds->len; i++) { #ifndef __SYMBIAN32__ struct pollfd *pfd = &g_array_index (set->fds, struct pollfd, i); @@ -316,8 +327,7 @@ } } - g_mutex_unlock (set->lock); - + g_mutex_unlock (set->lock); return max_fd; } @@ -502,7 +512,7 @@ { GstPoll *nset; - nset = g_new0 (GstPoll, 1); + nset = g_slice_new0 (GstPoll); nset->lock = g_mutex_new (); #ifndef G_OS_WIN32 nset->mode = GST_POLL_MODE_AUTO; @@ -537,6 +547,40 @@ } /** + * gst_poll_new_timer: + * + * Create a new poll object that can be used for scheduling cancellable + * timeouts. + * + * A timeout is performed with gst_poll_wait(). Multiple timeouts can be + * performed from different threads. + * + * Returns: a new #GstPoll, or %NULL in case of an error. Free with + * gst_poll_free(). + * + * Since: 0.10.23 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +GstPoll * +gst_poll_new_timer (void) +{ + GstPoll *poll; + + /* make a new controllable poll set */ + if (!(poll = gst_poll_new (TRUE))) + goto done; + + /* we are a timer */ + poll->timer = TRUE; + +done: + return poll; +} + +/** * gst_poll_free: * @set: a file descriptor set. * @@ -575,7 +619,7 @@ g_array_free (set->active_fds, TRUE); g_array_free (set->fds, TRUE); g_mutex_free (set->lock); - g_free (set); + g_slice_free (GstPoll, set); } /** @@ -756,7 +800,8 @@ else pfd->events &= ~POLLOUT; #else - gst_poll_update_winsock_event_mask (set, idx, FD_WRITE, active); + gst_poll_update_winsock_event_mask (set, idx, FD_WRITE | FD_CONNECT, + active); #endif } @@ -812,7 +857,9 @@ #ifdef __SYMBIAN32__ EXPORT_C #endif - gboolean gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active) + +gboolean +gst_poll_fd_ctl_read (GstPoll * set, GstPollFD * fd, gboolean active) { gboolean ret; @@ -849,7 +896,8 @@ EXPORT_C #endif - void gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd) +void +gst_poll_fd_ignored (GstPoll * set, GstPollFD * fd) { #ifdef G_OS_WIN32 gint idx; @@ -886,7 +934,8 @@ EXPORT_C #endif - gboolean gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd) +gboolean +gst_poll_fd_has_closed (const GstPoll * set, GstPollFD * fd) { gboolean res = FALSE; gint idx; @@ -959,7 +1008,8 @@ res = (wfd->events.iErrorCode[FD_CLOSE_BIT] != 0) || (wfd->events.iErrorCode[FD_READ_BIT] != 0) || (wfd->events.iErrorCode[FD_WRITE_BIT] != 0) || - (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0); + (wfd->events.iErrorCode[FD_ACCEPT_BIT] != 0) || + (wfd->events.iErrorCode[FD_CONNECT_BIT] != 0); #endif } @@ -1006,7 +1056,8 @@ EXPORT_C #endif -gboolean gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd) +gboolean +gst_poll_fd_can_read (const GstPoll * set, GstPollFD * fd) { gboolean res = FALSE; @@ -1038,7 +1089,8 @@ EXPORT_C #endif -gboolean gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd) +gboolean +gst_poll_fd_can_write (const GstPoll * set, GstPollFD * fd) { gboolean res = FALSE; gint idx; @@ -1113,8 +1165,13 @@ * Wait for activity on the file descriptors in @set. This function waits up to * the specified @timeout. A timeout of #GST_CLOCK_TIME_NONE waits forever. * - * When this function is called from multiple threads, -1 will be returned with - * errno set to EPERM. + * For #GstPoll objects created with gst_poll_new(), this function can only be + * called from a single thread at a time. If called from multiple threads, + * -1 will be returned with errno set to EPERM. + * + * This is not true for timer #GstPoll objects created with + * gst_poll_new_timer(), where it is allowed to have multiple threads waiting + * simultaneously. * * Returns: The number of #GstPollFD in @set that have activity or 0 when no * activity was detected after @timeout. If an error occurs, -1 is returned @@ -1126,24 +1183,26 @@ EXPORT_C #endif -gint gst_poll_wait (GstPoll * set, GstClockTime timeout) +gint +gst_poll_wait (GstPoll * set, GstClockTime timeout) { gboolean restarting; - int res = -1; + int res; g_return_val_if_fail (set != NULL, -1); g_mutex_lock (set->lock); - /* we cannot wait from multiple threads */ - if (set->waiting) + /* we cannot wait from multiple threads unless we are a timer */ + if (G_UNLIKELY (set->waiting > 0 && !set->timer)) goto already_waiting; /* flushing, exit immediatly */ - if (set->flushing) + if (G_UNLIKELY (set->flushing)) goto flushing; - set->waiting = TRUE; + /* add one more waiter */ + set->waiting++; do { GstPollMode mode; @@ -1240,7 +1299,7 @@ } else { tvptr = NULL; } -//temporary fix for multifdsink + FD_ZERO(&excepfds); if( max_fd != -1) @@ -1332,20 +1391,21 @@ g_mutex_lock (set->lock); - gst_poll_check_ctrl_commands (set, res, &restarting); + if (!set->timer) + gst_poll_check_ctrl_commands (set, res, &restarting); /* update the controllable state if needed */ set->controllable = set->new_controllable; - if (set->flushing) { + if (G_UNLIKELY (set->flushing)) { /* we got woken up and we are flushing, we need to stop */ errno = EBUSY; res = -1; break; } - } while (restarting); + } while (G_UNLIKELY (restarting)); - set->waiting = FALSE; + set->waiting--; g_mutex_unlock (set->lock); @@ -1367,6 +1427,7 @@ #ifdef G_OS_WIN32 winsock_error: { + set->waiting--; g_mutex_unlock (set->lock); return -1; } @@ -1390,7 +1451,8 @@ EXPORT_C #endif -gboolean gst_poll_set_controllable (GstPoll * set, gboolean controllable) +gboolean +gst_poll_set_controllable (GstPoll * set, gboolean controllable) { g_return_val_if_fail (set != NULL, FALSE); @@ -1403,6 +1465,7 @@ if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0) goto no_socket_pair; */ + pipe(control_sock); fcntl (control_sock[0], F_SETFL, O_NONBLOCK); fcntl (control_sock[1], F_SETFL, O_NONBLOCK); @@ -1418,7 +1481,7 @@ /* delay the change of the controllable state if we are waiting */ set->new_controllable = controllable; - if (!set->waiting) + if (set->waiting == 0) set->controllable = controllable; g_mutex_unlock (set->lock); @@ -1450,17 +1513,20 @@ EXPORT_C #endif -void gst_poll_restart (GstPoll * set) +void +gst_poll_restart (GstPoll * set) { g_return_if_fail (set != NULL); g_mutex_lock (set->lock); - if (set->controllable && set->waiting) { + if (set->controllable && set->waiting > 0) { #ifndef G_OS_WIN32 + gint result; + /* if we are waiting, we can send the command, else we do not have to * bother, future calls will automatically pick up the new fdset */ - SEND_COMMAND (set, GST_POLL_CMD_WAKEUP); + SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result); #else SetEvent (set->wakeup_event); #endif @@ -1485,7 +1551,8 @@ EXPORT_C #endif -void gst_poll_set_flushing (GstPoll * set, gboolean flushing) +void +gst_poll_set_flushing (GstPoll * set, gboolean flushing) { g_return_if_fail (set != NULL); @@ -1494,12 +1561,14 @@ /* update the new state first */ set->flushing = flushing; - if (flushing && set->controllable && set->waiting) { + if (flushing && set->controllable && set->waiting > 0) { /* we are flushing, controllable and waiting, wake up the waiter. When we * stop the flushing operation we don't clear the wakeup fd here, this will * happen in the _wait() thread. */ #ifndef G_OS_WIN32 - SEND_COMMAND (set, GST_POLL_CMD_WAKEUP); + gint result; + + SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result); #else SetEvent (set->wakeup_event); #endif @@ -1507,3 +1576,88 @@ g_mutex_unlock (set->lock); } + +/** + * gst_poll_write_control: + * @set: a #GstPoll. + * + * Write a byte to the control socket of the controllable @set. + * This function is mostly useful for timer #GstPoll objects created with + * gst_poll_new_timer(). + * + * It will make any current and future gst_poll_wait() function return with + * 1, meaning the control socket is set. After an equal amount of calls to + * gst_poll_read_control() have been performed, calls to gst_poll_wait() will + * block again until their timeout expired. + * + * Returns: %TRUE on success. %FALSE when @set is not controllable or when the + * byte could not be written. + * + * Since: 0.10.23 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean +gst_poll_write_control (GstPoll * set) +{ + gboolean res = FALSE; + + g_return_val_if_fail (set != NULL, FALSE); + + g_mutex_lock (set->lock); + if (set->controllable) { +#ifndef G_OS_WIN32 + gint result; + + SEND_COMMAND (set, GST_POLL_CMD_WAKEUP, result); + res = (result > 0); +#else + res = SetEvent (set->wakeup_event); +#endif + } + g_mutex_unlock (set->lock); + + return res; +} + +/** + * gst_poll_read_control: + * @set: a #GstPoll. + * + * Read a byte from the control socket of the controllable @set. + * This function is mostly useful for timer #GstPoll objects created with + * gst_poll_new_timer(). + * + * Returns: %TRUE on success. %FALSE when @set is not controllable or when there + * was no byte to read. + * + * Since: 0.10.23 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean +gst_poll_read_control (GstPoll * set) +{ + gboolean res = FALSE; + + g_return_val_if_fail (set != NULL, FALSE); + + g_mutex_lock (set->lock); + if (set->controllable) { +#ifndef G_OS_WIN32 + guchar cmd; + gint result; + READ_COMMAND (set, cmd, result); + res = (result > 0); +#else + res = ResetEvent (set->wakeup_event); +#endif + } + g_mutex_unlock (set->lock); + + return res; +}