diff -r 567bb019e3e3 -r 7e817e7e631c gstreamer_core/gst/gsttask.c --- a/gstreamer_core/gst/gsttask.c Tue Aug 31 15:30:33 2010 +0300 +++ b/gstreamer_core/gst/gsttask.c Wed Sep 01 12:16:41 2010 +0100 @@ -42,14 +42,13 @@ * gst_task_set_lock(). * * The task can be started, paused and stopped with gst_task_start(), gst_task_pause() - * and gst_task_stop() respectively or with the gst_task_set_state() function. + * and gst_task_stop() respectively. * - * A #GstTask will repeatedly call the #GstTaskFunction with the user data - * that was provided when creating the task with gst_task_create(). While calling - * the function it will acquire the provided lock. The provided lock is released - * when the task pauses or stops. + * A #GstTask will repeadedly call the #GstTaskFunction with the user data + * that was provided when creating the task with gst_task_create(). Before calling + * the function it will acquire the provided lock. * - * Stopping a task with gst_task_stop() will not immediately make sure the task is + * Stopping a task with gst_task_stop() will not immediatly make sure the task is * not running anymore. Use gst_task_join() to make sure the task is completely * stopped and the thread is stopped. * @@ -71,53 +70,45 @@ GST_DEBUG_CATEGORY_STATIC (task_debug); #define GST_CAT_DEFAULT (task_debug) -#define GST_TASK_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK, GstTaskPrivate)) - -struct _GstTaskPrivate -{ - /* callbacks for managing the thread of this task */ - GstTaskThreadCallbacks thr_callbacks; - gpointer thr_user_data; - GDestroyNotify thr_notify; - - gboolean prio_set; - GThreadPriority priority; - - /* configured pool */ - GstTaskPool *pool; - - /* remember the pool and id that is currently running. */ - gpointer id; - GstTaskPool *pool_id; -}; - static void gst_task_class_init (GstTaskClass * klass); static void gst_task_init (GstTask * task); static void gst_task_finalize (GObject * object); -static void gst_task_func (GstTask * task); +static void gst_task_func (GstTask * task, GstTaskClass * tclass); + +static GstObjectClass *parent_class = NULL; static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; +#ifdef __SYMBIAN32__ +EXPORT_C +#endif -#define _do_init \ -{ \ - GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \ -} -G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init); +GType +gst_task_get_type (void) +{ + static GType _gst_task_type = 0; -static void -init_klass_pool (GstTaskClass * klass) -{ - g_static_mutex_lock (&pool_lock); - if (klass->pool) { - gst_task_pool_cleanup (klass->pool); - gst_object_unref (klass->pool); + if (G_UNLIKELY (_gst_task_type == 0)) { + static const GTypeInfo task_info = { + sizeof (GstTaskClass), + NULL, + NULL, + (GClassInitFunc) gst_task_class_init, + NULL, + NULL, + sizeof (GstTask), + 0, + (GInstanceInitFunc) gst_task_init, + NULL + }; + + _gst_task_type = + g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0); + + GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); } - klass->pool = gst_task_pool_new (); - gst_task_pool_prepare (klass->pool, NULL); - g_static_mutex_unlock (&pool_lock); + return _gst_task_type; } static void @@ -127,66 +118,44 @@ gobject_class = (GObjectClass *) klass; - g_type_class_add_private (klass, sizeof (GstTaskPrivate)); + parent_class = g_type_class_peek_parent (klass); gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize); - init_klass_pool (klass); + klass->pool = g_thread_pool_new ( + (GFunc) gst_task_func, klass, -1, FALSE, NULL); } static void gst_task_init (GstTask * task) { - GstTaskClass *klass; - - klass = GST_TASK_GET_CLASS (task); - - task->priv = GST_TASK_GET_PRIVATE (task); task->running = FALSE; task->abidata.ABI.thread = NULL; task->lock = NULL; task->cond = g_cond_new (); task->state = GST_TASK_STOPPED; - task->priv->prio_set = FALSE; - - /* use the default klass pool for this task, users can - * override this later */ - g_static_mutex_lock (&pool_lock); - task->priv->pool = gst_object_ref (klass->pool); - g_static_mutex_unlock (&pool_lock); } static void gst_task_finalize (GObject * object) { GstTask *task = GST_TASK (object); - GstTaskPrivate *priv = task->priv; GST_DEBUG ("task %p finalize", task); - if (priv->thr_notify) - priv->thr_notify (priv->thr_user_data); - priv->thr_notify = NULL; - priv->thr_user_data = NULL; - - gst_object_unref (priv->pool); - /* task thread cannot be running here since it holds a ref * to the task so that the finalize could not have happened */ g_cond_free (task->cond); task->cond = NULL; - G_OBJECT_CLASS (gst_task_parent_class)->finalize (object); + G_OBJECT_CLASS (parent_class)->finalize (object); } static void -gst_task_func (GstTask * task) +gst_task_func (GstTask * task, GstTaskClass * tclass) { GStaticRecMutex *lock; GThread *tself; - GstTaskPrivate *priv; - - priv = task->priv; tself = g_thread_self (); @@ -202,15 +171,8 @@ if (G_UNLIKELY (lock == NULL)) goto no_lock; task->abidata.ABI.thread = tself; - /* only update the priority when it was changed */ - if (priv->prio_set) - g_thread_set_priority (tself, priv->priority); GST_OBJECT_UNLOCK (task); - /* fire the enter_thread callback when we need to */ - if (priv->thr_callbacks.enter_thread) - priv->thr_callbacks.enter_thread (task, tself, priv->thr_user_data); - /* locking order is TASK_LOCK, LOCK */ g_static_rec_mutex_lock (lock); GST_OBJECT_LOCK (task); @@ -247,17 +209,6 @@ task->abidata.ABI.thread = NULL; exit: - if (priv->thr_callbacks.leave_thread) { - /* fire the leave_thread callback when we need to. We need to do this before - * we signal the task and with the task lock released. */ - GST_OBJECT_UNLOCK (task); - priv->thr_callbacks.leave_thread (task, tself, priv->thr_user_data); - GST_OBJECT_LOCK (task); - } else { - /* restore normal priority when releasing back into the pool, we will not - * touch the priority when a custom callback has been installed. */ - g_thread_set_priority (tself, G_THREAD_PRIORITY_NORMAL); - } /* now we allow messing with the lock again by setting the running flag to * FALSE. Together with the SIGNAL this is the sign for the _join() to * complete. @@ -285,7 +236,7 @@ * gst_task_cleanup_all: * * Wait for all tasks to be stopped. This is mainly used internally - * to ensure proper cleanup of internal data structures in test suites. + * to ensure proper cleanup of internal datastructures in testsuites. * * MT safe. */ @@ -299,7 +250,18 @@ GstTaskClass *klass; if ((klass = g_type_class_peek (GST_TYPE_TASK))) { - init_klass_pool (klass); + g_static_mutex_lock (&pool_lock); + if (klass->pool) { + /* Shut down all the threads, we still process the ones scheduled + * because the unref happens in the thread function. + * Also wait for currently running ones to finish. */ + g_thread_pool_free (klass->pool, FALSE, TRUE); + /* create new pool, so we can still do something after this + * call. */ + klass->pool = g_thread_pool_new ( + (GFunc) gst_task_func, klass, -1, FALSE, NULL); + } + g_static_mutex_unlock (&pool_lock); } } @@ -308,19 +270,12 @@ * @func: The #GstTaskFunction to use * @data: User data to pass to @func * - * Create a new Task that will repeatedly call the provided @func + * Create a new Task that will repeadedly call the provided @func * with @data as a parameter. Typically the task will run in * a new thread. * * The function cannot be changed after the task has been created. You - * must create a new #GstTask to change the function. - * - * This function will not yet create and start a thread. Use gst_task_start() or - * gst_task_pause() to create and start the GThread. - * - * Before the task can be used, a #GStaticRecMutex must be configured using the - * gst_task_set_lock() function. This lock will always be acquired while - * @func is called. + * must create a new GstTask to change the function. * * Returns: A new #GstTask. * @@ -347,7 +302,7 @@ /** * gst_task_set_lock: * @task: The #GstTask to use - * @mutex: The #GMutex to use + * @mutex: The GMutex to use * * Set the mutex used by the task. The mutex will be acquired before * calling the #GstTaskFunction. @@ -380,175 +335,6 @@ } } -/** - * gst_task_set_priority: - * @task: a #GstTask - * @priority: a new priority for @task - * - * Changes the priority of @task to @priority. - * - * Note: try not to depend on task priorities. - * - * MT safe. - * - * Since: 0.10.24 - */ -#ifdef __SYMBIAN32__ -EXPORT_C -#endif - -void -gst_task_set_priority (GstTask * task, GThreadPriority priority) -{ - GstTaskPrivate *priv; - GThread *thread; - - g_return_if_fail (GST_IS_TASK (task)); - - priv = task->priv; - - GST_OBJECT_LOCK (task); - priv->prio_set = TRUE; - priv->priority = priority; - thread = task->abidata.ABI.thread; - if (thread != NULL) { - /* if this task already has a thread, we can configure the priority right - * away, else we do that when we assign a thread to the task. */ - g_thread_set_priority (thread, priority); - } - GST_OBJECT_UNLOCK (task); -} - -/** - * gst_task_get_pool: - * @task: a #GstTask - * - * Get the #GstTaskPool that this task will use for its streaming - * threads. - * - * MT safe. - * - * Returns: the #GstTaskPool used by @task. gst_object_unref() - * after usage. - * - * Since: 0.10.24 - */ -#ifdef __SYMBIAN32__ -EXPORT_C -#endif - -GstTaskPool * -gst_task_get_pool (GstTask * task) -{ - GstTaskPool *result; - GstTaskPrivate *priv; - - g_return_val_if_fail (GST_IS_TASK (task), NULL); - - priv = task->priv; - - GST_OBJECT_LOCK (task); - result = gst_object_ref (priv->pool); - GST_OBJECT_UNLOCK (task); - - return result; -} - -/** - * gst_task_set_pool: - * @task: a #GstTask - * @pool: a #GstTaskPool - * - * Set @pool as the new GstTaskPool for @task. Any new streaming threads that - * will be created by @task will now use @pool. - * - * MT safe. - * - * Since: 0.10.24 - */ -#ifdef __SYMBIAN32__ -EXPORT_C -#endif - -void -gst_task_set_pool (GstTask * task, GstTaskPool * pool) -{ - GstTaskPool *old; - GstTaskPrivate *priv; - - g_return_if_fail (GST_IS_TASK (task)); - g_return_if_fail (GST_IS_TASK_POOL (pool)); - - priv = task->priv; - - GST_OBJECT_LOCK (task); - if (priv->pool != pool) { - old = priv->pool; - priv->pool = gst_object_ref (pool); - } else - old = NULL; - GST_OBJECT_UNLOCK (task); - - if (old) - gst_object_unref (old); -} - - -/** - * gst_task_set_thread_callbacks: - * @task: The #GstTask to use - * @callbacks: a #GstTaskThreadCallbacks pointer - * @user_data: user data passed to the callbacks - * @notify: called when @user_data is no longer referenced - * - * Set callbacks which will be executed when a new thread is needed, the thread - * function is entered and left and when the thread is joined. - * - * By default a thread for @task will be created from a default thread pool. - * - * Objects can use custom GThreads or can perform additional configuration of - * the threads (such as changing the thread priority) by installing callbacks. - * - * MT safe. - * - * Since: 0.10.24 - */ -#ifdef __SYMBIAN32__ -EXPORT_C -#endif - -void -gst_task_set_thread_callbacks (GstTask * task, - GstTaskThreadCallbacks * callbacks, gpointer user_data, - GDestroyNotify notify) -{ - GDestroyNotify old_notify; - - g_return_if_fail (task != NULL); - g_return_if_fail (GST_IS_TASK (task)); - g_return_if_fail (callbacks != NULL); - - GST_OBJECT_LOCK (task); - old_notify = task->priv->thr_notify; - - if (old_notify) { - gpointer old_data; - - old_data = task->priv->thr_user_data; - - task->priv->thr_user_data = NULL; - task->priv->thr_notify = NULL; - GST_OBJECT_UNLOCK (task); - - old_notify (old_data); - - GST_OBJECT_LOCK (task); - } - task->priv->thr_callbacks = *callbacks; - task->priv->thr_user_data = user_data; - task->priv->thr_notify = notify; - GST_OBJECT_UNLOCK (task); -} /** * gst_task_get_state: @@ -578,122 +364,14 @@ return result; } -/* make sure the task is running and start a thread if it's not. - * This function must be called with the task LOCK. */ -static gboolean -start_task (GstTask * task) -{ - gboolean res = TRUE; - GstTaskClass *tclass; - GError *error = NULL; - GstTaskPrivate *priv; - - priv = task->priv; - - /* new task, We ref before so that it remains alive while - * the thread is running. */ - gst_object_ref (task); - /* mark task as running so that a join will wait until we schedule - * and exit the task function. */ - task->running = TRUE; - - tclass = GST_TASK_GET_CLASS (task); - - /* push on the thread pool, we remember the original pool because the user - * could change it later on and then we join to the wrong pool. */ - priv->pool_id = gst_object_ref (priv->pool); - priv->id = - gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func, - task, &error); - - if (error != NULL) { - g_warning ("failed to create thread: %s", error->message); - g_error_free (error); - res = FALSE; - } - return res; -} - - -/** - * gst_task_set_state: - * @task: a #GstTask - * @state: the new task state - * - * Sets the state of @task to @state. - * - * The @task must have a lock associated with it using - * gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or - * this function will return %FALSE. - * - * MT safe. - * - * Returns: %TRUE if the state could be changed. - * - * Since: 0.10.24 - */ -#ifdef __SYMBIAN32__ -EXPORT_C -#endif - -gboolean -gst_task_set_state (GstTask * task, GstTaskState state) -{ - GstTaskState old; - gboolean res = TRUE; - - g_return_val_if_fail (GST_IS_TASK (task), FALSE); - - GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state); - - GST_OBJECT_LOCK (task); - if (state != GST_TASK_STOPPED) - if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) - goto no_lock; - - /* if the state changed, do our thing */ - old = task->state; - if (old != state) { - task->state = state; - switch (old) { - case GST_TASK_STOPPED: - /* If the task already has a thread scheduled we don't have to do - * anything. */ - if (G_UNLIKELY (!task->running)) - res = start_task (task); - break; - case GST_TASK_PAUSED: - /* when we are paused, signal to go to the new state */ - GST_TASK_SIGNAL (task); - break; - case GST_TASK_STARTED: - /* if we were started, we'll go to the new state after the next - * iteration. */ - break; - } - } - GST_OBJECT_UNLOCK (task); - - return res; - - /* ERRORS */ -no_lock: - { - GST_WARNING_OBJECT (task, "state %d set on task without a lock", state); - GST_OBJECT_UNLOCK (task); - g_warning ("task without a lock can't be set to state %d", state); - return FALSE; - } -} - /** * gst_task_start: * @task: The #GstTask to start * * Starts @task. The @task must have a lock associated with it using - * gst_task_set_lock() or this function will return %FALSE. + * gst_task_set_lock() or thsi function will return FALSE. * - * Returns: %TRUE if the task could be started. + * Returns: TRUE if the task could be started. * * MT safe. */ @@ -704,7 +382,62 @@ gboolean gst_task_start (GstTask * task) { - return gst_task_set_state (task, GST_TASK_STARTED); + GstTaskState old; + + g_return_val_if_fail (GST_IS_TASK (task), FALSE); + + GST_DEBUG_OBJECT (task, "Starting task %p", task); + + GST_OBJECT_LOCK (task); + if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) + goto no_lock; + + old = task->state; + task->state = GST_TASK_STARTED; + switch (old) { + case GST_TASK_STOPPED: + { + GstTaskClass *tclass; + + /* If the task already has a thread scheduled we don't have to do + * anything. */ + if (task->running) + break; + + /* new task, push on threadpool. We ref before so + * that it remains alive while on the threadpool. */ + gst_object_ref (task); + /* mark task as running so that a join will wait until we schedule + * and exit the task function. */ + task->running = TRUE; + + tclass = GST_TASK_GET_CLASS (task); + + g_static_mutex_lock (&pool_lock); + g_thread_pool_push (tclass->pool, task, NULL); + g_static_mutex_unlock (&pool_lock); + break; + } + case GST_TASK_PAUSED: + /* PAUSE to PLAY, signal */ + GST_TASK_SIGNAL (task); + break; + case GST_TASK_STARTED: + /* was OK */ + break; + } + GST_OBJECT_UNLOCK (task); + + return TRUE; + + /* ERRORS */ +no_lock: + { + GST_WARNING_OBJECT (task, "starting task without a lock"); + GST_OBJECT_UNLOCK (task); + g_warning ("starting task without a lock"); + return FALSE; + } } /** @@ -715,7 +448,7 @@ * will not wait for the task to have completely stopped. Use * gst_task_join() to stop and wait for completion. * - * Returns: %TRUE if the task could be stopped. + * Returns: TRUE if the task could be stopped. * * MT safe. */ @@ -726,7 +459,30 @@ gboolean gst_task_stop (GstTask * task) { - return gst_task_set_state (task, GST_TASK_STOPPED); + GstTaskClass *tclass; + GstTaskState old; + + g_return_val_if_fail (GST_IS_TASK (task), FALSE); + + tclass = GST_TASK_GET_CLASS (task); + + GST_DEBUG_OBJECT (task, "Stopping task %p", task); + + GST_OBJECT_LOCK (task); + old = task->state; + task->state = GST_TASK_STOPPED; + switch (old) { + case GST_TASK_STOPPED: + break; + case GST_TASK_PAUSED: + GST_TASK_SIGNAL (task); + break; + case GST_TASK_STARTED: + break; + } + GST_OBJECT_UNLOCK (task); + + return TRUE; } /** @@ -738,17 +494,63 @@ * in the paused state. This function does not wait for the task to complete * the paused state. * - * Returns: %TRUE if the task could be paused. + * Returns: TRUE if the task could be paused. * * MT safe. */ #ifdef __SYMBIAN32__ EXPORT_C -#endif +#endif gboolean gst_task_pause (GstTask * task) { - return gst_task_set_state (task, GST_TASK_PAUSED); + GstTaskState old; + + g_return_val_if_fail (GST_IS_TASK (task), FALSE); + + GST_DEBUG_OBJECT (task, "Pausing task %p", task); + + GST_OBJECT_LOCK (task); + if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) + goto no_lock; + + old = task->state; + task->state = GST_TASK_PAUSED; + switch (old) { + case GST_TASK_STOPPED: + { + GstTaskClass *tclass; + + if (task->running) + break; + + gst_object_ref (task); + task->running = TRUE; + + tclass = GST_TASK_GET_CLASS (task); + + g_static_mutex_lock (&pool_lock); + g_thread_pool_push (tclass->pool, task, NULL); + g_static_mutex_unlock (&pool_lock); + break; + } + case GST_TASK_PAUSED: + break; + case GST_TASK_STARTED: + break; + } + GST_OBJECT_UNLOCK (task); + + return TRUE; + + /* ERRORS */ +no_lock: + { + GST_WARNING_OBJECT (task, "pausing task without a lock"); + GST_OBJECT_UNLOCK (task); + g_warning ("pausing task without a lock"); + return FALSE; + } } /** @@ -764,7 +566,7 @@ * would cause a deadlock. The function will detect this and print a * g_warning. * - * Returns: %TRUE if the task could be joined. + * Returns: TRUE if the task could be joined. * * MT safe. */ @@ -776,11 +578,6 @@ gst_task_join (GstTask * task) { GThread *tself; - GstTaskPrivate *priv; - gpointer id; - GstTaskPool *pool = NULL; - - priv = task->priv; g_return_val_if_fail (GST_IS_TASK (task), FALSE); @@ -789,33 +586,20 @@ GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself); /* we don't use a real thread join here because we are using - * thread pools */ + * threadpools */ GST_OBJECT_LOCK (task); if (G_UNLIKELY (tself == task->abidata.ABI.thread)) goto joining_self; task->state = GST_TASK_STOPPED; /* signal the state change for when it was blocked in PAUSED. */ GST_TASK_SIGNAL (task); - /* we set the running flag when pushing the task on the thread pool. + /* we set the running flag when pushing the task on the threadpool. * This means that the task function might not be called when we try * to join it here. */ while (G_LIKELY (task->running)) GST_TASK_WAIT (task); - /* clean the thread */ - task->abidata.ABI.thread = NULL; - /* get the id and pool to join */ - pool = priv->pool_id; - id = priv->id; - priv->pool_id = NULL; - priv->id = NULL; GST_OBJECT_UNLOCK (task); - if (pool) { - if (id) - gst_task_pool_join (pool, id); - gst_object_unref (pool); - } - GST_DEBUG_OBJECT (task, "Joined task %p", task); return TRUE;