diff -r 71e347f905f2 -r 4a7fac7dd34a gstreamer_core/gst/gsttask.c --- a/gstreamer_core/gst/gsttask.c Fri Mar 19 09:35:09 2010 +0200 +++ b/gstreamer_core/gst/gsttask.c Fri Apr 16 15:15:52 2010 +0300 @@ -42,13 +42,14 @@ * gst_task_set_lock(). * * The task can be started, paused and stopped with gst_task_start(), gst_task_pause() - * and gst_task_stop() respectively. + * and gst_task_stop() respectively or with the gst_task_set_state() function. * - * A #GstTask will repeadedly call the #GstTaskFunction with the user data - * that was provided when creating the task with gst_task_create(). Before calling - * the function it will acquire the provided lock. + * A #GstTask will repeatedly call the #GstTaskFunction with the user data + * that was provided when creating the task with gst_task_create(). While calling + * the function it will acquire the provided lock. The provided lock is released + * when the task pauses or stops. * - * Stopping a task with gst_task_stop() will not immediatly make sure the task is + * Stopping a task with gst_task_stop() will not immediately make sure the task is * not running anymore. Use gst_task_join() to make sure the task is completely * stopped and the thread is stopped. * @@ -70,45 +71,53 @@ GST_DEBUG_CATEGORY_STATIC (task_debug); #define GST_CAT_DEFAULT (task_debug) +#define GST_TASK_GET_PRIVATE(obj) \ + (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK, GstTaskPrivate)) + +struct _GstTaskPrivate +{ + /* callbacks for managing the thread of this task */ + GstTaskThreadCallbacks thr_callbacks; + gpointer thr_user_data; + GDestroyNotify thr_notify; + + gboolean prio_set; + GThreadPriority priority; + + /* configured pool */ + GstTaskPool *pool; + + /* remember the pool and id that is currently running. */ + gpointer id; + GstTaskPool *pool_id; +}; + static void gst_task_class_init (GstTaskClass * klass); static void gst_task_init (GstTask * task); static void gst_task_finalize (GObject * object); -static void gst_task_func (GstTask * task, GstTaskClass * tclass); - -static GstObjectClass *parent_class = NULL; +static void gst_task_func (GstTask * task); static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; -#ifdef __SYMBIAN32__ -EXPORT_C -#endif - -GType -gst_task_get_type (void) -{ - static GType _gst_task_type = 0; +#define _do_init \ +{ \ + GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \ +} + +G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init); - if (G_UNLIKELY (_gst_task_type == 0)) { - static const GTypeInfo task_info = { - sizeof (GstTaskClass), - NULL, - NULL, - (GClassInitFunc) gst_task_class_init, - NULL, - NULL, - sizeof (GstTask), - 0, - (GInstanceInitFunc) gst_task_init, - NULL - }; - - _gst_task_type = - g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0); - - GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); +static void +init_klass_pool (GstTaskClass * klass) +{ + g_static_mutex_lock (&pool_lock); + if (klass->pool) { + gst_task_pool_cleanup (klass->pool); + gst_object_unref (klass->pool); } - return _gst_task_type; + klass->pool = gst_task_pool_new (); + gst_task_pool_prepare (klass->pool, NULL); + g_static_mutex_unlock (&pool_lock); } static void @@ -118,44 +127,66 @@ gobject_class = (GObjectClass *) klass; - parent_class = g_type_class_peek_parent (klass); + g_type_class_add_private (klass, sizeof (GstTaskPrivate)); gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize); - klass->pool = g_thread_pool_new ( - (GFunc) gst_task_func, klass, -1, FALSE, NULL); + init_klass_pool (klass); } static void gst_task_init (GstTask * task) { + GstTaskClass *klass; + + klass = GST_TASK_GET_CLASS (task); + + task->priv = GST_TASK_GET_PRIVATE (task); task->running = FALSE; task->abidata.ABI.thread = NULL; task->lock = NULL; task->cond = g_cond_new (); task->state = GST_TASK_STOPPED; + task->priv->prio_set = FALSE; + + /* use the default klass pool for this task, users can + * override this later */ + g_static_mutex_lock (&pool_lock); + task->priv->pool = gst_object_ref (klass->pool); + g_static_mutex_unlock (&pool_lock); } static void gst_task_finalize (GObject * object) { GstTask *task = GST_TASK (object); + GstTaskPrivate *priv = task->priv; GST_DEBUG ("task %p finalize", task); + if (priv->thr_notify) + priv->thr_notify (priv->thr_user_data); + priv->thr_notify = NULL; + priv->thr_user_data = NULL; + + gst_object_unref (priv->pool); + /* task thread cannot be running here since it holds a ref * to the task so that the finalize could not have happened */ g_cond_free (task->cond); task->cond = NULL; - G_OBJECT_CLASS (parent_class)->finalize (object); + G_OBJECT_CLASS (gst_task_parent_class)->finalize (object); } static void -gst_task_func (GstTask * task, GstTaskClass * tclass) +gst_task_func (GstTask * task) { GStaticRecMutex *lock; GThread *tself; + GstTaskPrivate *priv; + + priv = task->priv; tself = g_thread_self (); @@ -171,8 +202,15 @@ if (G_UNLIKELY (lock == NULL)) goto no_lock; task->abidata.ABI.thread = tself; + /* only update the priority when it was changed */ + if (priv->prio_set) + g_thread_set_priority (tself, priv->priority); GST_OBJECT_UNLOCK (task); + /* fire the enter_thread callback when we need to */ + if (priv->thr_callbacks.enter_thread) + priv->thr_callbacks.enter_thread (task, tself, priv->thr_user_data); + /* locking order is TASK_LOCK, LOCK */ g_static_rec_mutex_lock (lock); GST_OBJECT_LOCK (task); @@ -209,6 +247,17 @@ task->abidata.ABI.thread = NULL; exit: + if (priv->thr_callbacks.leave_thread) { + /* fire the leave_thread callback when we need to. We need to do this before + * we signal the task and with the task lock released. */ + GST_OBJECT_UNLOCK (task); + priv->thr_callbacks.leave_thread (task, tself, priv->thr_user_data); + GST_OBJECT_LOCK (task); + } else { + /* restore normal priority when releasing back into the pool, we will not + * touch the priority when a custom callback has been installed. */ + g_thread_set_priority (tself, G_THREAD_PRIORITY_NORMAL); + } /* now we allow messing with the lock again by setting the running flag to * FALSE. Together with the SIGNAL this is the sign for the _join() to * complete. @@ -236,7 +285,7 @@ * gst_task_cleanup_all: * * Wait for all tasks to be stopped. This is mainly used internally - * to ensure proper cleanup of internal datastructures in testsuites. + * to ensure proper cleanup of internal data structures in test suites. * * MT safe. */ @@ -250,18 +299,7 @@ GstTaskClass *klass; if ((klass = g_type_class_peek (GST_TYPE_TASK))) { - g_static_mutex_lock (&pool_lock); - if (klass->pool) { - /* Shut down all the threads, we still process the ones scheduled - * because the unref happens in the thread function. - * Also wait for currently running ones to finish. */ - g_thread_pool_free (klass->pool, FALSE, TRUE); - /* create new pool, so we can still do something after this - * call. */ - klass->pool = g_thread_pool_new ( - (GFunc) gst_task_func, klass, -1, FALSE, NULL); - } - g_static_mutex_unlock (&pool_lock); + init_klass_pool (klass); } } @@ -270,12 +308,19 @@ * @func: The #GstTaskFunction to use * @data: User data to pass to @func * - * Create a new Task that will repeadedly call the provided @func + * Create a new Task that will repeatedly call the provided @func * with @data as a parameter. Typically the task will run in * a new thread. * * The function cannot be changed after the task has been created. You - * must create a new GstTask to change the function. + * must create a new #GstTask to change the function. + * + * This function will not yet create and start a thread. Use gst_task_start() or + * gst_task_pause() to create and start the GThread. + * + * Before the task can be used, a #GStaticRecMutex must be configured using the + * gst_task_set_lock() function. This lock will always be acquired while + * @func is called. * * Returns: A new #GstTask. * @@ -302,7 +347,7 @@ /** * gst_task_set_lock: * @task: The #GstTask to use - * @mutex: The GMutex to use + * @mutex: The #GMutex to use * * Set the mutex used by the task. The mutex will be acquired before * calling the #GstTaskFunction. @@ -335,6 +380,175 @@ } } +/** + * gst_task_set_priority: + * @task: a #GstTask + * @priority: a new priority for @task + * + * Changes the priority of @task to @priority. + * + * Note: try not to depend on task priorities. + * + * MT safe. + * + * Since: 0.10.24 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +void +gst_task_set_priority (GstTask * task, GThreadPriority priority) +{ + GstTaskPrivate *priv; + GThread *thread; + + g_return_if_fail (GST_IS_TASK (task)); + + priv = task->priv; + + GST_OBJECT_LOCK (task); + priv->prio_set = TRUE; + priv->priority = priority; + thread = task->abidata.ABI.thread; + if (thread != NULL) { + /* if this task already has a thread, we can configure the priority right + * away, else we do that when we assign a thread to the task. */ + g_thread_set_priority (thread, priority); + } + GST_OBJECT_UNLOCK (task); +} + +/** + * gst_task_get_pool: + * @task: a #GstTask + * + * Get the #GstTaskPool that this task will use for its streaming + * threads. + * + * MT safe. + * + * Returns: the #GstTaskPool used by @task. gst_object_unref() + * after usage. + * + * Since: 0.10.24 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +GstTaskPool * +gst_task_get_pool (GstTask * task) +{ + GstTaskPool *result; + GstTaskPrivate *priv; + + g_return_val_if_fail (GST_IS_TASK (task), NULL); + + priv = task->priv; + + GST_OBJECT_LOCK (task); + result = gst_object_ref (priv->pool); + GST_OBJECT_UNLOCK (task); + + return result; +} + +/** + * gst_task_set_pool: + * @task: a #GstTask + * @pool: a #GstTaskPool + * + * Set @pool as the new GstTaskPool for @task. Any new streaming threads that + * will be created by @task will now use @pool. + * + * MT safe. + * + * Since: 0.10.24 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +void +gst_task_set_pool (GstTask * task, GstTaskPool * pool) +{ + GstTaskPool *old; + GstTaskPrivate *priv; + + g_return_if_fail (GST_IS_TASK (task)); + g_return_if_fail (GST_IS_TASK_POOL (pool)); + + priv = task->priv; + + GST_OBJECT_LOCK (task); + if (priv->pool != pool) { + old = priv->pool; + priv->pool = gst_object_ref (pool); + } else + old = NULL; + GST_OBJECT_UNLOCK (task); + + if (old) + gst_object_unref (old); +} + + +/** + * gst_task_set_thread_callbacks: + * @task: The #GstTask to use + * @callbacks: a #GstTaskThreadCallbacks pointer + * @user_data: user data passed to the callbacks + * @notify: called when @user_data is no longer referenced + * + * Set callbacks which will be executed when a new thread is needed, the thread + * function is entered and left and when the thread is joined. + * + * By default a thread for @task will be created from a default thread pool. + * + * Objects can use custom GThreads or can perform additional configuration of + * the threads (such as changing the thread priority) by installing callbacks. + * + * MT safe. + * + * Since: 0.10.24 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +void +gst_task_set_thread_callbacks (GstTask * task, + GstTaskThreadCallbacks * callbacks, gpointer user_data, + GDestroyNotify notify) +{ + GDestroyNotify old_notify; + + g_return_if_fail (task != NULL); + g_return_if_fail (GST_IS_TASK (task)); + g_return_if_fail (callbacks != NULL); + + GST_OBJECT_LOCK (task); + old_notify = task->priv->thr_notify; + + if (old_notify) { + gpointer old_data; + + old_data = task->priv->thr_user_data; + + task->priv->thr_user_data = NULL; + task->priv->thr_notify = NULL; + GST_OBJECT_UNLOCK (task); + + old_notify (old_data); + + GST_OBJECT_LOCK (task); + } + task->priv->thr_callbacks = *callbacks; + task->priv->thr_user_data = user_data; + task->priv->thr_notify = notify; + GST_OBJECT_UNLOCK (task); +} /** * gst_task_get_state: @@ -364,14 +578,122 @@ return result; } +/* make sure the task is running and start a thread if it's not. + * This function must be called with the task LOCK. */ +static gboolean +start_task (GstTask * task) +{ + gboolean res = TRUE; + GstTaskClass *tclass; + GError *error = NULL; + GstTaskPrivate *priv; + + priv = task->priv; + + /* new task, We ref before so that it remains alive while + * the thread is running. */ + gst_object_ref (task); + /* mark task as running so that a join will wait until we schedule + * and exit the task function. */ + task->running = TRUE; + + tclass = GST_TASK_GET_CLASS (task); + + /* push on the thread pool, we remember the original pool because the user + * could change it later on and then we join to the wrong pool. */ + priv->pool_id = gst_object_ref (priv->pool); + priv->id = + gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func, + task, &error); + + if (error != NULL) { + g_warning ("failed to create thread: %s", error->message); + g_error_free (error); + res = FALSE; + } + return res; +} + + +/** + * gst_task_set_state: + * @task: a #GstTask + * @state: the new task state + * + * Sets the state of @task to @state. + * + * The @task must have a lock associated with it using + * gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or + * this function will return %FALSE. + * + * MT safe. + * + * Returns: %TRUE if the state could be changed. + * + * Since: 0.10.24 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gboolean +gst_task_set_state (GstTask * task, GstTaskState state) +{ + GstTaskState old; + gboolean res = TRUE; + + g_return_val_if_fail (GST_IS_TASK (task), FALSE); + + GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state); + + GST_OBJECT_LOCK (task); + if (state != GST_TASK_STOPPED) + if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) + goto no_lock; + + /* if the state changed, do our thing */ + old = task->state; + if (old != state) { + task->state = state; + switch (old) { + case GST_TASK_STOPPED: + /* If the task already has a thread scheduled we don't have to do + * anything. */ + if (G_UNLIKELY (!task->running)) + res = start_task (task); + break; + case GST_TASK_PAUSED: + /* when we are paused, signal to go to the new state */ + GST_TASK_SIGNAL (task); + break; + case GST_TASK_STARTED: + /* if we were started, we'll go to the new state after the next + * iteration. */ + break; + } + } + GST_OBJECT_UNLOCK (task); + + return res; + + /* ERRORS */ +no_lock: + { + GST_WARNING_OBJECT (task, "state %d set on task without a lock", state); + GST_OBJECT_UNLOCK (task); + g_warning ("task without a lock can't be set to state %d", state); + return FALSE; + } +} + /** * gst_task_start: * @task: The #GstTask to start * * Starts @task. The @task must have a lock associated with it using - * gst_task_set_lock() or thsi function will return FALSE. + * gst_task_set_lock() or this function will return %FALSE. * - * Returns: TRUE if the task could be started. + * Returns: %TRUE if the task could be started. * * MT safe. */ @@ -382,62 +704,7 @@ gboolean gst_task_start (GstTask * task) { - GstTaskState old; - - g_return_val_if_fail (GST_IS_TASK (task), FALSE); - - GST_DEBUG_OBJECT (task, "Starting task %p", task); - - GST_OBJECT_LOCK (task); - if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) - goto no_lock; - - old = task->state; - task->state = GST_TASK_STARTED; - switch (old) { - case GST_TASK_STOPPED: - { - GstTaskClass *tclass; - - /* If the task already has a thread scheduled we don't have to do - * anything. */ - if (task->running) - break; - - /* new task, push on threadpool. We ref before so - * that it remains alive while on the threadpool. */ - gst_object_ref (task); - /* mark task as running so that a join will wait until we schedule - * and exit the task function. */ - task->running = TRUE; - - tclass = GST_TASK_GET_CLASS (task); - - g_static_mutex_lock (&pool_lock); - g_thread_pool_push (tclass->pool, task, NULL); - g_static_mutex_unlock (&pool_lock); - break; - } - case GST_TASK_PAUSED: - /* PAUSE to PLAY, signal */ - GST_TASK_SIGNAL (task); - break; - case GST_TASK_STARTED: - /* was OK */ - break; - } - GST_OBJECT_UNLOCK (task); - - return TRUE; - - /* ERRORS */ -no_lock: - { - GST_WARNING_OBJECT (task, "starting task without a lock"); - GST_OBJECT_UNLOCK (task); - g_warning ("starting task without a lock"); - return FALSE; - } + return gst_task_set_state (task, GST_TASK_STARTED); } /** @@ -448,7 +715,7 @@ * will not wait for the task to have completely stopped. Use * gst_task_join() to stop and wait for completion. * - * Returns: TRUE if the task could be stopped. + * Returns: %TRUE if the task could be stopped. * * MT safe. */ @@ -459,30 +726,7 @@ gboolean gst_task_stop (GstTask * task) { - GstTaskClass *tclass; - GstTaskState old; - - g_return_val_if_fail (GST_IS_TASK (task), FALSE); - - tclass = GST_TASK_GET_CLASS (task); - - GST_DEBUG_OBJECT (task, "Stopping task %p", task); - - GST_OBJECT_LOCK (task); - old = task->state; - task->state = GST_TASK_STOPPED; - switch (old) { - case GST_TASK_STOPPED: - break; - case GST_TASK_PAUSED: - GST_TASK_SIGNAL (task); - break; - case GST_TASK_STARTED: - break; - } - GST_OBJECT_UNLOCK (task); - - return TRUE; + return gst_task_set_state (task, GST_TASK_STOPPED); } /** @@ -494,63 +738,17 @@ * in the paused state. This function does not wait for the task to complete * the paused state. * - * Returns: TRUE if the task could be paused. + * Returns: %TRUE if the task could be paused. * * MT safe. */ #ifdef __SYMBIAN32__ EXPORT_C -#endif +#endif gboolean gst_task_pause (GstTask * task) { - GstTaskState old; - - g_return_val_if_fail (GST_IS_TASK (task), FALSE); - - GST_DEBUG_OBJECT (task, "Pausing task %p", task); - - GST_OBJECT_LOCK (task); - if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) - goto no_lock; - - old = task->state; - task->state = GST_TASK_PAUSED; - switch (old) { - case GST_TASK_STOPPED: - { - GstTaskClass *tclass; - - if (task->running) - break; - - gst_object_ref (task); - task->running = TRUE; - - tclass = GST_TASK_GET_CLASS (task); - - g_static_mutex_lock (&pool_lock); - g_thread_pool_push (tclass->pool, task, NULL); - g_static_mutex_unlock (&pool_lock); - break; - } - case GST_TASK_PAUSED: - break; - case GST_TASK_STARTED: - break; - } - GST_OBJECT_UNLOCK (task); - - return TRUE; - - /* ERRORS */ -no_lock: - { - GST_WARNING_OBJECT (task, "pausing task without a lock"); - GST_OBJECT_UNLOCK (task); - g_warning ("pausing task without a lock"); - return FALSE; - } + return gst_task_set_state (task, GST_TASK_PAUSED); } /** @@ -566,7 +764,7 @@ * would cause a deadlock. The function will detect this and print a * g_warning. * - * Returns: TRUE if the task could be joined. + * Returns: %TRUE if the task could be joined. * * MT safe. */ @@ -578,6 +776,11 @@ gst_task_join (GstTask * task) { GThread *tself; + GstTaskPrivate *priv; + gpointer id; + GstTaskPool *pool = NULL; + + priv = task->priv; g_return_val_if_fail (GST_IS_TASK (task), FALSE); @@ -586,20 +789,33 @@ GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself); /* we don't use a real thread join here because we are using - * threadpools */ + * thread pools */ GST_OBJECT_LOCK (task); if (G_UNLIKELY (tself == task->abidata.ABI.thread)) goto joining_self; task->state = GST_TASK_STOPPED; /* signal the state change for when it was blocked in PAUSED. */ GST_TASK_SIGNAL (task); - /* we set the running flag when pushing the task on the threadpool. + /* we set the running flag when pushing the task on the thread pool. * This means that the task function might not be called when we try * to join it here. */ while (G_LIKELY (task->running)) GST_TASK_WAIT (task); + /* clean the thread */ + task->abidata.ABI.thread = NULL; + /* get the id and pool to join */ + pool = priv->pool_id; + id = priv->id; + priv->pool_id = NULL; + priv->id = NULL; GST_OBJECT_UNLOCK (task); + if (pool) { + if (id) + gst_task_pool_join (pool, id); + gst_object_unref (pool); + } + GST_DEBUG_OBJECT (task, "Joined task %p", task); return TRUE;