gstreamer_core/gst/gsttask.c
branchRCL_3
changeset 30 7e817e7e631c
parent 29 567bb019e3e3
equal deleted inserted replaced
29:567bb019e3e3 30:7e817e7e631c
    40  *
    40  *
    41  * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with
    41  * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with
    42  * gst_task_set_lock().
    42  * gst_task_set_lock().
    43  *
    43  *
    44  * The task can be started, paused and stopped with gst_task_start(), gst_task_pause()
    44  * The task can be started, paused and stopped with gst_task_start(), gst_task_pause()
    45  * and gst_task_stop() respectively or with the gst_task_set_state() function.
    45  * and gst_task_stop() respectively.
    46  *
    46  *
    47  * A #GstTask will repeatedly call the #GstTaskFunction with the user data
    47  * A #GstTask will repeadedly call the #GstTaskFunction with the user data
    48  * that was provided when creating the task with gst_task_create(). While calling
    48  * that was provided when creating the task with gst_task_create(). Before calling
    49  * the function it will acquire the provided lock. The provided lock is released
    49  * the function it will acquire the provided lock.
    50  * when the task pauses or stops.
    50  *
    51  *
    51  * Stopping a task with gst_task_stop() will not immediatly make sure the task is
    52  * Stopping a task with gst_task_stop() will not immediately make sure the task is
       
    53  * not running anymore. Use gst_task_join() to make sure the task is completely
    52  * not running anymore. Use gst_task_join() to make sure the task is completely
    54  * stopped and the thread is stopped.
    53  * stopped and the thread is stopped.
    55  *
    54  *
    56  * After creating a #GstTask, use gst_object_unref() to free its resources. This can
    55  * After creating a #GstTask, use gst_object_unref() to free its resources. This can
    57  * only be done it the task is not running anymore.
    56  * only be done it the task is not running anymore.
    68 #include <glib_global.h>
    67 #include <glib_global.h>
    69 #endif
    68 #endif
    70 
    69 
    71 GST_DEBUG_CATEGORY_STATIC (task_debug);
    70 GST_DEBUG_CATEGORY_STATIC (task_debug);
    72 #define GST_CAT_DEFAULT (task_debug)
    71 #define GST_CAT_DEFAULT (task_debug)
    73 
       
    74 #define GST_TASK_GET_PRIVATE(obj)  \
       
    75    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK, GstTaskPrivate))
       
    76 
       
    77 struct _GstTaskPrivate
       
    78 {
       
    79   /* callbacks for managing the thread of this task */
       
    80   GstTaskThreadCallbacks thr_callbacks;
       
    81   gpointer thr_user_data;
       
    82   GDestroyNotify thr_notify;
       
    83 
       
    84   gboolean prio_set;
       
    85   GThreadPriority priority;
       
    86 
       
    87   /* configured pool */
       
    88   GstTaskPool *pool;
       
    89 
       
    90   /* remember the pool and id that is currently running. */
       
    91   gpointer id;
       
    92   GstTaskPool *pool_id;
       
    93 };
       
    94 
    72 
    95 static void gst_task_class_init (GstTaskClass * klass);
    73 static void gst_task_class_init (GstTaskClass * klass);
    96 static void gst_task_init (GstTask * task);
    74 static void gst_task_init (GstTask * task);
    97 static void gst_task_finalize (GObject * object);
    75 static void gst_task_finalize (GObject * object);
    98 
    76 
    99 static void gst_task_func (GstTask * task);
    77 static void gst_task_func (GstTask * task, GstTaskClass * tclass);
       
    78 
       
    79 static GstObjectClass *parent_class = NULL;
   100 
    80 
   101 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
    81 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
   102 
    82 #ifdef __SYMBIAN32__
   103 #define _do_init \
    83 EXPORT_C
   104 { \
    84 #endif
   105   GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \
    85 
   106 }
    86 
   107 
    87 GType
   108 G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init);
    88 gst_task_get_type (void)
   109 
    89 {
   110 static void
    90   static GType _gst_task_type = 0;
   111 init_klass_pool (GstTaskClass * klass)
    91 
   112 {
    92   if (G_UNLIKELY (_gst_task_type == 0)) {
   113   g_static_mutex_lock (&pool_lock);
    93     static const GTypeInfo task_info = {
   114   if (klass->pool) {
    94       sizeof (GstTaskClass),
   115     gst_task_pool_cleanup (klass->pool);
    95       NULL,
   116     gst_object_unref (klass->pool);
    96       NULL,
   117   }
    97       (GClassInitFunc) gst_task_class_init,
   118   klass->pool = gst_task_pool_new ();
    98       NULL,
   119   gst_task_pool_prepare (klass->pool, NULL);
    99       NULL,
   120   g_static_mutex_unlock (&pool_lock);
   100       sizeof (GstTask),
       
   101       0,
       
   102       (GInstanceInitFunc) gst_task_init,
       
   103       NULL
       
   104     };
       
   105 
       
   106     _gst_task_type =
       
   107         g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
       
   108 
       
   109     GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
       
   110   }
       
   111   return _gst_task_type;
   121 }
   112 }
   122 
   113 
   123 static void
   114 static void
   124 gst_task_class_init (GstTaskClass * klass)
   115 gst_task_class_init (GstTaskClass * klass)
   125 {
   116 {
   126   GObjectClass *gobject_class;
   117   GObjectClass *gobject_class;
   127 
   118 
   128   gobject_class = (GObjectClass *) klass;
   119   gobject_class = (GObjectClass *) klass;
   129 
   120 
   130   g_type_class_add_private (klass, sizeof (GstTaskPrivate));
   121   parent_class = g_type_class_peek_parent (klass);
   131 
   122 
   132   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
   123   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
   133 
   124 
   134   init_klass_pool (klass);
   125   klass->pool = g_thread_pool_new (
       
   126       (GFunc) gst_task_func, klass, -1, FALSE, NULL);
   135 }
   127 }
   136 
   128 
   137 static void
   129 static void
   138 gst_task_init (GstTask * task)
   130 gst_task_init (GstTask * task)
   139 {
   131 {
   140   GstTaskClass *klass;
       
   141 
       
   142   klass = GST_TASK_GET_CLASS (task);
       
   143 
       
   144   task->priv = GST_TASK_GET_PRIVATE (task);
       
   145   task->running = FALSE;
   132   task->running = FALSE;
   146   task->abidata.ABI.thread = NULL;
   133   task->abidata.ABI.thread = NULL;
   147   task->lock = NULL;
   134   task->lock = NULL;
   148   task->cond = g_cond_new ();
   135   task->cond = g_cond_new ();
   149   task->state = GST_TASK_STOPPED;
   136   task->state = GST_TASK_STOPPED;
   150   task->priv->prio_set = FALSE;
       
   151 
       
   152   /* use the default klass pool for this task, users can
       
   153    * override this later */
       
   154   g_static_mutex_lock (&pool_lock);
       
   155   task->priv->pool = gst_object_ref (klass->pool);
       
   156   g_static_mutex_unlock (&pool_lock);
       
   157 }
   137 }
   158 
   138 
   159 static void
   139 static void
   160 gst_task_finalize (GObject * object)
   140 gst_task_finalize (GObject * object)
   161 {
   141 {
   162   GstTask *task = GST_TASK (object);
   142   GstTask *task = GST_TASK (object);
   163   GstTaskPrivate *priv = task->priv;
       
   164 
   143 
   165   GST_DEBUG ("task %p finalize", task);
   144   GST_DEBUG ("task %p finalize", task);
   166 
       
   167   if (priv->thr_notify)
       
   168     priv->thr_notify (priv->thr_user_data);
       
   169   priv->thr_notify = NULL;
       
   170   priv->thr_user_data = NULL;
       
   171 
       
   172   gst_object_unref (priv->pool);
       
   173 
   145 
   174   /* task thread cannot be running here since it holds a ref
   146   /* task thread cannot be running here since it holds a ref
   175    * to the task so that the finalize could not have happened */
   147    * to the task so that the finalize could not have happened */
   176   g_cond_free (task->cond);
   148   g_cond_free (task->cond);
   177   task->cond = NULL;
   149   task->cond = NULL;
   178 
   150 
   179   G_OBJECT_CLASS (gst_task_parent_class)->finalize (object);
   151   G_OBJECT_CLASS (parent_class)->finalize (object);
   180 }
   152 }
   181 
   153 
   182 static void
   154 static void
   183 gst_task_func (GstTask * task)
   155 gst_task_func (GstTask * task, GstTaskClass * tclass)
   184 {
   156 {
   185   GStaticRecMutex *lock;
   157   GStaticRecMutex *lock;
   186   GThread *tself;
   158   GThread *tself;
   187   GstTaskPrivate *priv;
       
   188 
       
   189   priv = task->priv;
       
   190 
   159 
   191   tself = g_thread_self ();
   160   tself = g_thread_self ();
   192 
   161 
   193   GST_DEBUG ("Entering task %p, thread %p", task, tself);
   162   GST_DEBUG ("Entering task %p, thread %p", task, tself);
   194 
   163 
   200     goto exit;
   169     goto exit;
   201   lock = GST_TASK_GET_LOCK (task);
   170   lock = GST_TASK_GET_LOCK (task);
   202   if (G_UNLIKELY (lock == NULL))
   171   if (G_UNLIKELY (lock == NULL))
   203     goto no_lock;
   172     goto no_lock;
   204   task->abidata.ABI.thread = tself;
   173   task->abidata.ABI.thread = tself;
   205   /* only update the priority when it was changed */
   174   GST_OBJECT_UNLOCK (task);
   206   if (priv->prio_set)
       
   207     g_thread_set_priority (tself, priv->priority);
       
   208   GST_OBJECT_UNLOCK (task);
       
   209 
       
   210   /* fire the enter_thread callback when we need to */
       
   211   if (priv->thr_callbacks.enter_thread)
       
   212     priv->thr_callbacks.enter_thread (task, tself, priv->thr_user_data);
       
   213 
   175 
   214   /* locking order is TASK_LOCK, LOCK */
   176   /* locking order is TASK_LOCK, LOCK */
   215   g_static_rec_mutex_lock (lock);
   177   g_static_rec_mutex_lock (lock);
   216   GST_OBJECT_LOCK (task);
   178   GST_OBJECT_LOCK (task);
   217   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
   179   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
   245 
   207 
   246   GST_OBJECT_LOCK (task);
   208   GST_OBJECT_LOCK (task);
   247   task->abidata.ABI.thread = NULL;
   209   task->abidata.ABI.thread = NULL;
   248 
   210 
   249 exit:
   211 exit:
   250   if (priv->thr_callbacks.leave_thread) {
       
   251     /* fire the leave_thread callback when we need to. We need to do this before
       
   252      * we signal the task and with the task lock released. */
       
   253     GST_OBJECT_UNLOCK (task);
       
   254     priv->thr_callbacks.leave_thread (task, tself, priv->thr_user_data);
       
   255     GST_OBJECT_LOCK (task);
       
   256   } else {
       
   257     /* restore normal priority when releasing back into the pool, we will not
       
   258      * touch the priority when a custom callback has been installed. */
       
   259     g_thread_set_priority (tself, G_THREAD_PRIORITY_NORMAL);
       
   260   }
       
   261   /* now we allow messing with the lock again by setting the running flag to
   212   /* now we allow messing with the lock again by setting the running flag to
   262    * FALSE. Together with the SIGNAL this is the sign for the _join() to 
   213    * FALSE. Together with the SIGNAL this is the sign for the _join() to 
   263    * complete. 
   214    * complete. 
   264    * Note that we still have not dropped the final ref on the task. We could
   215    * Note that we still have not dropped the final ref on the task. We could
   265    * check here if there is a pending join() going on and drop the last ref
   216    * check here if there is a pending join() going on and drop the last ref
   283 
   234 
   284 /**
   235 /**
   285  * gst_task_cleanup_all:
   236  * gst_task_cleanup_all:
   286  *
   237  *
   287  * Wait for all tasks to be stopped. This is mainly used internally
   238  * Wait for all tasks to be stopped. This is mainly used internally
   288  * to ensure proper cleanup of internal data structures in test suites.
   239  * to ensure proper cleanup of internal datastructures in testsuites.
   289  *
   240  *
   290  * MT safe.
   241  * MT safe.
   291  */
   242  */
   292 #ifdef __SYMBIAN32__
   243 #ifdef __SYMBIAN32__
   293 EXPORT_C
   244 EXPORT_C
   297 gst_task_cleanup_all (void)
   248 gst_task_cleanup_all (void)
   298 {
   249 {
   299   GstTaskClass *klass;
   250   GstTaskClass *klass;
   300 
   251 
   301   if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
   252   if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
   302     init_klass_pool (klass);
   253     g_static_mutex_lock (&pool_lock);
       
   254     if (klass->pool) {
       
   255       /* Shut down all the threads, we still process the ones scheduled
       
   256        * because the unref happens in the thread function.
       
   257        * Also wait for currently running ones to finish. */
       
   258       g_thread_pool_free (klass->pool, FALSE, TRUE);
       
   259       /* create new pool, so we can still do something after this
       
   260        * call. */
       
   261       klass->pool = g_thread_pool_new (
       
   262           (GFunc) gst_task_func, klass, -1, FALSE, NULL);
       
   263     }
       
   264     g_static_mutex_unlock (&pool_lock);
   303   }
   265   }
   304 }
   266 }
   305 
   267 
   306 /**
   268 /**
   307  * gst_task_create:
   269  * gst_task_create:
   308  * @func: The #GstTaskFunction to use
   270  * @func: The #GstTaskFunction to use
   309  * @data: User data to pass to @func
   271  * @data: User data to pass to @func
   310  *
   272  *
   311  * Create a new Task that will repeatedly call the provided @func
   273  * Create a new Task that will repeadedly call the provided @func
   312  * with @data as a parameter. Typically the task will run in
   274  * with @data as a parameter. Typically the task will run in
   313  * a new thread.
   275  * a new thread.
   314  *
   276  *
   315  * The function cannot be changed after the task has been created. You
   277  * The function cannot be changed after the task has been created. You
   316  * must create a new #GstTask to change the function.
   278  * must create a new GstTask to change the function.
   317  *
       
   318  * This function will not yet create and start a thread. Use gst_task_start() or
       
   319  * gst_task_pause() to create and start the GThread.
       
   320  *
       
   321  * Before the task can be used, a #GStaticRecMutex must be configured using the
       
   322  * gst_task_set_lock() function. This lock will always be acquired while
       
   323  * @func is called.
       
   324  *
   279  *
   325  * Returns: A new #GstTask.
   280  * Returns: A new #GstTask.
   326  *
   281  *
   327  * MT safe.
   282  * MT safe.
   328  */
   283  */
   345 }
   300 }
   346 
   301 
   347 /**
   302 /**
   348  * gst_task_set_lock:
   303  * gst_task_set_lock:
   349  * @task: The #GstTask to use
   304  * @task: The #GstTask to use
   350  * @mutex: The #GMutex to use
   305  * @mutex: The GMutex to use
   351  *
   306  *
   352  * Set the mutex used by the task. The mutex will be acquired before
   307  * Set the mutex used by the task. The mutex will be acquired before
   353  * calling the #GstTaskFunction.
   308  * calling the #GstTaskFunction.
   354  *
   309  *
   355  * This function has to be called before calling gst_task_pause() or
   310  * This function has to be called before calling gst_task_pause() or
   378     GST_OBJECT_UNLOCK (task);
   333     GST_OBJECT_UNLOCK (task);
   379     g_warning ("cannot call set_lock on a running task");
   334     g_warning ("cannot call set_lock on a running task");
   380   }
   335   }
   381 }
   336 }
   382 
   337 
   383 /**
       
   384  * gst_task_set_priority:
       
   385  * @task: a #GstTask
       
   386  * @priority: a new priority for @task
       
   387  *
       
   388  * Changes the priority of @task to @priority.
       
   389  *
       
   390  * Note: try not to depend on task priorities.
       
   391  *
       
   392  * MT safe.
       
   393  *
       
   394  * Since: 0.10.24
       
   395  */
       
   396 #ifdef __SYMBIAN32__
       
   397 EXPORT_C
       
   398 #endif
       
   399 
       
   400 void
       
   401 gst_task_set_priority (GstTask * task, GThreadPriority priority)
       
   402 {
       
   403   GstTaskPrivate *priv;
       
   404   GThread *thread;
       
   405 
       
   406   g_return_if_fail (GST_IS_TASK (task));
       
   407 
       
   408   priv = task->priv;
       
   409 
       
   410   GST_OBJECT_LOCK (task);
       
   411   priv->prio_set = TRUE;
       
   412   priv->priority = priority;
       
   413   thread = task->abidata.ABI.thread;
       
   414   if (thread != NULL) {
       
   415     /* if this task already has a thread, we can configure the priority right
       
   416      * away, else we do that when we assign a thread to the task. */
       
   417     g_thread_set_priority (thread, priority);
       
   418   }
       
   419   GST_OBJECT_UNLOCK (task);
       
   420 }
       
   421 
       
   422 /**
       
   423  * gst_task_get_pool:
       
   424  * @task: a #GstTask
       
   425  *
       
   426  * Get the #GstTaskPool that this task will use for its streaming
       
   427  * threads.
       
   428  *
       
   429  * MT safe.
       
   430  *
       
   431  * Returns: the #GstTaskPool used by @task. gst_object_unref()
       
   432  * after usage.
       
   433  *
       
   434  * Since: 0.10.24
       
   435  */
       
   436 #ifdef __SYMBIAN32__
       
   437 EXPORT_C
       
   438 #endif
       
   439 
       
   440 GstTaskPool *
       
   441 gst_task_get_pool (GstTask * task)
       
   442 {
       
   443   GstTaskPool *result;
       
   444   GstTaskPrivate *priv;
       
   445 
       
   446   g_return_val_if_fail (GST_IS_TASK (task), NULL);
       
   447 
       
   448   priv = task->priv;
       
   449 
       
   450   GST_OBJECT_LOCK (task);
       
   451   result = gst_object_ref (priv->pool);
       
   452   GST_OBJECT_UNLOCK (task);
       
   453 
       
   454   return result;
       
   455 }
       
   456 
       
   457 /**
       
   458  * gst_task_set_pool:
       
   459  * @task: a #GstTask
       
   460  * @pool: a #GstTaskPool
       
   461  *
       
   462  * Set @pool as the new GstTaskPool for @task. Any new streaming threads that
       
   463  * will be created by @task will now use @pool.
       
   464  *
       
   465  * MT safe.
       
   466  *
       
   467  * Since: 0.10.24
       
   468  */
       
   469 #ifdef __SYMBIAN32__
       
   470 EXPORT_C
       
   471 #endif
       
   472 
       
   473 void
       
   474 gst_task_set_pool (GstTask * task, GstTaskPool * pool)
       
   475 {
       
   476   GstTaskPool *old;
       
   477   GstTaskPrivate *priv;
       
   478 
       
   479   g_return_if_fail (GST_IS_TASK (task));
       
   480   g_return_if_fail (GST_IS_TASK_POOL (pool));
       
   481 
       
   482   priv = task->priv;
       
   483 
       
   484   GST_OBJECT_LOCK (task);
       
   485   if (priv->pool != pool) {
       
   486     old = priv->pool;
       
   487     priv->pool = gst_object_ref (pool);
       
   488   } else
       
   489     old = NULL;
       
   490   GST_OBJECT_UNLOCK (task);
       
   491 
       
   492   if (old)
       
   493     gst_object_unref (old);
       
   494 }
       
   495 
       
   496 
       
   497 /**
       
   498  * gst_task_set_thread_callbacks:
       
   499  * @task: The #GstTask to use
       
   500  * @callbacks: a #GstTaskThreadCallbacks pointer
       
   501  * @user_data: user data passed to the callbacks
       
   502  * @notify: called when @user_data is no longer referenced
       
   503  *
       
   504  * Set callbacks which will be executed when a new thread is needed, the thread
       
   505  * function is entered and left and when the thread is joined.
       
   506  *
       
   507  * By default a thread for @task will be created from a default thread pool.
       
   508  *
       
   509  * Objects can use custom GThreads or can perform additional configuration of
       
   510  * the threads (such as changing the thread priority) by installing callbacks.
       
   511  *
       
   512  * MT safe.
       
   513  *
       
   514  * Since: 0.10.24
       
   515  */
       
   516 #ifdef __SYMBIAN32__
       
   517 EXPORT_C
       
   518 #endif
       
   519 
       
   520 void
       
   521 gst_task_set_thread_callbacks (GstTask * task,
       
   522     GstTaskThreadCallbacks * callbacks, gpointer user_data,
       
   523     GDestroyNotify notify)
       
   524 {
       
   525   GDestroyNotify old_notify;
       
   526 
       
   527   g_return_if_fail (task != NULL);
       
   528   g_return_if_fail (GST_IS_TASK (task));
       
   529   g_return_if_fail (callbacks != NULL);
       
   530 
       
   531   GST_OBJECT_LOCK (task);
       
   532   old_notify = task->priv->thr_notify;
       
   533 
       
   534   if (old_notify) {
       
   535     gpointer old_data;
       
   536 
       
   537     old_data = task->priv->thr_user_data;
       
   538 
       
   539     task->priv->thr_user_data = NULL;
       
   540     task->priv->thr_notify = NULL;
       
   541     GST_OBJECT_UNLOCK (task);
       
   542 
       
   543     old_notify (old_data);
       
   544 
       
   545     GST_OBJECT_LOCK (task);
       
   546   }
       
   547   task->priv->thr_callbacks = *callbacks;
       
   548   task->priv->thr_user_data = user_data;
       
   549   task->priv->thr_notify = notify;
       
   550   GST_OBJECT_UNLOCK (task);
       
   551 }
       
   552 
   338 
   553 /**
   339 /**
   554  * gst_task_get_state:
   340  * gst_task_get_state:
   555  * @task: The #GstTask to query
   341  * @task: The #GstTask to query
   556  *
   342  *
   576   GST_OBJECT_UNLOCK (task);
   362   GST_OBJECT_UNLOCK (task);
   577 
   363 
   578   return result;
   364   return result;
   579 }
   365 }
   580 
   366 
   581 /* make sure the task is running and start a thread if it's not.
   367 /**
   582  * This function must be called with the task LOCK. */
   368  * gst_task_start:
   583 static gboolean
   369  * @task: The #GstTask to start
   584 start_task (GstTask * task)
   370  *
   585 {
   371  * Starts @task. The @task must have a lock associated with it using
   586   gboolean res = TRUE;
   372  * gst_task_set_lock() or thsi function will return FALSE.
   587   GstTaskClass *tclass;
   373  *
   588   GError *error = NULL;
   374  * Returns: TRUE if the task could be started.
   589   GstTaskPrivate *priv;
   375  *
   590 
   376  * MT safe.
   591   priv = task->priv;
       
   592 
       
   593   /* new task, We ref before so that it remains alive while
       
   594    * the thread is running. */
       
   595   gst_object_ref (task);
       
   596   /* mark task as running so that a join will wait until we schedule
       
   597    * and exit the task function. */
       
   598   task->running = TRUE;
       
   599 
       
   600   tclass = GST_TASK_GET_CLASS (task);
       
   601 
       
   602   /* push on the thread pool, we remember the original pool because the user
       
   603    * could change it later on and then we join to the wrong pool. */
       
   604   priv->pool_id = gst_object_ref (priv->pool);
       
   605   priv->id =
       
   606       gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func,
       
   607       task, &error);
       
   608 
       
   609   if (error != NULL) {
       
   610     g_warning ("failed to create thread: %s", error->message);
       
   611     g_error_free (error);
       
   612     res = FALSE;
       
   613   }
       
   614   return res;
       
   615 }
       
   616 
       
   617 
       
   618 /**
       
   619  * gst_task_set_state:
       
   620  * @task: a #GstTask
       
   621  * @state: the new task state
       
   622  *
       
   623  * Sets the state of @task to @state.
       
   624  *
       
   625  * The @task must have a lock associated with it using
       
   626  * gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or
       
   627  * this function will return %FALSE.
       
   628  *
       
   629  * MT safe.
       
   630  *
       
   631  * Returns: %TRUE if the state could be changed.
       
   632  *
       
   633  * Since: 0.10.24
       
   634  */
   377  */
   635 #ifdef __SYMBIAN32__
   378 #ifdef __SYMBIAN32__
   636 EXPORT_C
   379 EXPORT_C
   637 #endif
   380 #endif
   638 
   381 
   639 gboolean
   382 gboolean
   640 gst_task_set_state (GstTask * task, GstTaskState state)
   383 gst_task_start (GstTask * task)
   641 {
   384 {
   642   GstTaskState old;
   385   GstTaskState old;
   643   gboolean res = TRUE;
       
   644 
   386 
   645   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
   387   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
   646 
   388 
   647   GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state);
   389   GST_DEBUG_OBJECT (task, "Starting task %p", task);
   648 
   390 
   649   GST_OBJECT_LOCK (task);
   391   GST_OBJECT_LOCK (task);
   650   if (state != GST_TASK_STOPPED)
   392   if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
   651     if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
   393     goto no_lock;
   652       goto no_lock;
   394 
   653 
       
   654   /* if the state changed, do our thing */
       
   655   old = task->state;
   395   old = task->state;
   656   if (old != state) {
   396   task->state = GST_TASK_STARTED;
   657     task->state = state;
   397   switch (old) {
   658     switch (old) {
   398     case GST_TASK_STOPPED:
   659       case GST_TASK_STOPPED:
   399     {
   660         /* If the task already has a thread scheduled we don't have to do
   400       GstTaskClass *tclass;
   661          * anything. */
   401 
   662         if (G_UNLIKELY (!task->running))
   402       /* If the task already has a thread scheduled we don't have to do
   663           res = start_task (task);
   403        * anything. */
       
   404       if (task->running)
   664         break;
   405         break;
   665       case GST_TASK_PAUSED:
   406 
   666         /* when we are paused, signal to go to the new state */
   407       /* new task, push on threadpool. We ref before so
   667         GST_TASK_SIGNAL (task);
   408        * that it remains alive while on the threadpool. */
   668         break;
   409       gst_object_ref (task);
   669       case GST_TASK_STARTED:
   410       /* mark task as running so that a join will wait until we schedule
   670         /* if we were started, we'll go to the new state after the next
   411        * and exit the task function. */
   671          * iteration. */
   412       task->running = TRUE;
   672         break;
   413 
       
   414       tclass = GST_TASK_GET_CLASS (task);
       
   415 
       
   416       g_static_mutex_lock (&pool_lock);
       
   417       g_thread_pool_push (tclass->pool, task, NULL);
       
   418       g_static_mutex_unlock (&pool_lock);
       
   419       break;
   673     }
   420     }
   674   }
   421     case GST_TASK_PAUSED:
   675   GST_OBJECT_UNLOCK (task);
   422       /* PAUSE to PLAY, signal */
   676 
   423       GST_TASK_SIGNAL (task);
   677   return res;
   424       break;
       
   425     case GST_TASK_STARTED:
       
   426       /* was OK */
       
   427       break;
       
   428   }
       
   429   GST_OBJECT_UNLOCK (task);
       
   430 
       
   431   return TRUE;
   678 
   432 
   679   /* ERRORS */
   433   /* ERRORS */
   680 no_lock:
   434 no_lock:
   681   {
   435   {
   682     GST_WARNING_OBJECT (task, "state %d set on task without a lock", state);
   436     GST_WARNING_OBJECT (task, "starting task without a lock");
   683     GST_OBJECT_UNLOCK (task);
   437     GST_OBJECT_UNLOCK (task);
   684     g_warning ("task without a lock can't be set to state %d", state);
   438     g_warning ("starting task without a lock");
   685     return FALSE;
   439     return FALSE;
   686   }
   440   }
   687 }
       
   688 
       
   689 /**
       
   690  * gst_task_start:
       
   691  * @task: The #GstTask to start
       
   692  *
       
   693  * Starts @task. The @task must have a lock associated with it using
       
   694  * gst_task_set_lock() or this function will return %FALSE.
       
   695  *
       
   696  * Returns: %TRUE if the task could be started.
       
   697  *
       
   698  * MT safe.
       
   699  */
       
   700 #ifdef __SYMBIAN32__
       
   701 EXPORT_C
       
   702 #endif
       
   703 
       
   704 gboolean
       
   705 gst_task_start (GstTask * task)
       
   706 {
       
   707   return gst_task_set_state (task, GST_TASK_STARTED);
       
   708 }
   441 }
   709 
   442 
   710 /**
   443 /**
   711  * gst_task_stop:
   444  * gst_task_stop:
   712  * @task: The #GstTask to stop
   445  * @task: The #GstTask to stop
   713  *
   446  *
   714  * Stops @task. This method merely schedules the task to stop and
   447  * Stops @task. This method merely schedules the task to stop and
   715  * will not wait for the task to have completely stopped. Use
   448  * will not wait for the task to have completely stopped. Use
   716  * gst_task_join() to stop and wait for completion.
   449  * gst_task_join() to stop and wait for completion.
   717  *
   450  *
   718  * Returns: %TRUE if the task could be stopped.
   451  * Returns: TRUE if the task could be stopped.
   719  *
   452  *
   720  * MT safe.
   453  * MT safe.
   721  */
   454  */
   722 #ifdef __SYMBIAN32__
   455 #ifdef __SYMBIAN32__
   723 EXPORT_C
   456 EXPORT_C
   724 #endif
   457 #endif
   725 
   458 
   726 gboolean
   459 gboolean
   727 gst_task_stop (GstTask * task)
   460 gst_task_stop (GstTask * task)
   728 {
   461 {
   729   return gst_task_set_state (task, GST_TASK_STOPPED);
   462   GstTaskClass *tclass;
       
   463   GstTaskState old;
       
   464 
       
   465   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
       
   466 
       
   467   tclass = GST_TASK_GET_CLASS (task);
       
   468 
       
   469   GST_DEBUG_OBJECT (task, "Stopping task %p", task);
       
   470 
       
   471   GST_OBJECT_LOCK (task);
       
   472   old = task->state;
       
   473   task->state = GST_TASK_STOPPED;
       
   474   switch (old) {
       
   475     case GST_TASK_STOPPED:
       
   476       break;
       
   477     case GST_TASK_PAUSED:
       
   478       GST_TASK_SIGNAL (task);
       
   479       break;
       
   480     case GST_TASK_STARTED:
       
   481       break;
       
   482   }
       
   483   GST_OBJECT_UNLOCK (task);
       
   484 
       
   485   return TRUE;
   730 }
   486 }
   731 
   487 
   732 /**
   488 /**
   733  * gst_task_pause:
   489  * gst_task_pause:
   734  * @task: The #GstTask to pause
   490  * @task: The #GstTask to pause
   736  * Pauses @task. This method can also be called on a task in the
   492  * Pauses @task. This method can also be called on a task in the
   737  * stopped state, in which case a thread will be started and will remain
   493  * stopped state, in which case a thread will be started and will remain
   738  * in the paused state. This function does not wait for the task to complete
   494  * in the paused state. This function does not wait for the task to complete
   739  * the paused state.
   495  * the paused state.
   740  *
   496  *
   741  * Returns: %TRUE if the task could be paused.
   497  * Returns: TRUE if the task could be paused.
   742  *
   498  *
   743  * MT safe.
   499  * MT safe.
   744  */
   500  */
   745 #ifdef __SYMBIAN32__
   501 #ifdef __SYMBIAN32__
   746 EXPORT_C
   502 EXPORT_C
   747 #endif
   503 #endif 
   748 gboolean
   504 gboolean
   749 gst_task_pause (GstTask * task)
   505 gst_task_pause (GstTask * task)
   750 {
   506 {
   751   return gst_task_set_state (task, GST_TASK_PAUSED);
   507   GstTaskState old;
       
   508 
       
   509   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
       
   510 
       
   511   GST_DEBUG_OBJECT (task, "Pausing task %p", task);
       
   512 
       
   513   GST_OBJECT_LOCK (task);
       
   514   if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
       
   515     goto no_lock;
       
   516 
       
   517   old = task->state;
       
   518   task->state = GST_TASK_PAUSED;
       
   519   switch (old) {
       
   520     case GST_TASK_STOPPED:
       
   521     {
       
   522       GstTaskClass *tclass;
       
   523 
       
   524       if (task->running)
       
   525         break;
       
   526 
       
   527       gst_object_ref (task);
       
   528       task->running = TRUE;
       
   529 
       
   530       tclass = GST_TASK_GET_CLASS (task);
       
   531 
       
   532       g_static_mutex_lock (&pool_lock);
       
   533       g_thread_pool_push (tclass->pool, task, NULL);
       
   534       g_static_mutex_unlock (&pool_lock);
       
   535       break;
       
   536     }
       
   537     case GST_TASK_PAUSED:
       
   538       break;
       
   539     case GST_TASK_STARTED:
       
   540       break;
       
   541   }
       
   542   GST_OBJECT_UNLOCK (task);
       
   543 
       
   544   return TRUE;
       
   545 
       
   546   /* ERRORS */
       
   547 no_lock:
       
   548   {
       
   549     GST_WARNING_OBJECT (task, "pausing task without a lock");
       
   550     GST_OBJECT_UNLOCK (task);
       
   551     g_warning ("pausing task without a lock");
       
   552     return FALSE;
       
   553   }
   752 }
   554 }
   753 
   555 
   754 /**
   556 /**
   755  * gst_task_join:
   557  * gst_task_join:
   756  * @task: The #GstTask to join
   558  * @task: The #GstTask to join
   762  *
   564  *
   763  * This function cannot be called from within a task function as this
   565  * This function cannot be called from within a task function as this
   764  * would cause a deadlock. The function will detect this and print a 
   566  * would cause a deadlock. The function will detect this and print a 
   765  * g_warning.
   567  * g_warning.
   766  *
   568  *
   767  * Returns: %TRUE if the task could be joined.
   569  * Returns: TRUE if the task could be joined.
   768  *
   570  *
   769  * MT safe.
   571  * MT safe.
   770  */
   572  */
   771 #ifdef __SYMBIAN32__
   573 #ifdef __SYMBIAN32__
   772 EXPORT_C
   574 EXPORT_C
   774 
   576 
   775 gboolean
   577 gboolean
   776 gst_task_join (GstTask * task)
   578 gst_task_join (GstTask * task)
   777 {
   579 {
   778   GThread *tself;
   580   GThread *tself;
   779   GstTaskPrivate *priv;
       
   780   gpointer id;
       
   781   GstTaskPool *pool = NULL;
       
   782 
       
   783   priv = task->priv;
       
   784 
   581 
   785   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
   582   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
   786 
   583 
   787   tself = g_thread_self ();
   584   tself = g_thread_self ();
   788 
   585 
   789   GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);
   586   GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);
   790 
   587 
   791   /* we don't use a real thread join here because we are using
   588   /* we don't use a real thread join here because we are using
   792    * thread pools */
   589    * threadpools */
   793   GST_OBJECT_LOCK (task);
   590   GST_OBJECT_LOCK (task);
   794   if (G_UNLIKELY (tself == task->abidata.ABI.thread))
   591   if (G_UNLIKELY (tself == task->abidata.ABI.thread))
   795     goto joining_self;
   592     goto joining_self;
   796   task->state = GST_TASK_STOPPED;
   593   task->state = GST_TASK_STOPPED;
   797   /* signal the state change for when it was blocked in PAUSED. */
   594   /* signal the state change for when it was blocked in PAUSED. */
   798   GST_TASK_SIGNAL (task);
   595   GST_TASK_SIGNAL (task);
   799   /* we set the running flag when pushing the task on the thread pool.
   596   /* we set the running flag when pushing the task on the threadpool. 
   800    * This means that the task function might not be called when we try
   597    * This means that the task function might not be called when we try
   801    * to join it here. */
   598    * to join it here. */
   802   while (G_LIKELY (task->running))
   599   while (G_LIKELY (task->running))
   803     GST_TASK_WAIT (task);
   600     GST_TASK_WAIT (task);
   804   /* clean the thread */
   601   GST_OBJECT_UNLOCK (task);
   805   task->abidata.ABI.thread = NULL;
       
   806   /* get the id and pool to join */
       
   807   pool = priv->pool_id;
       
   808   id = priv->id;
       
   809   priv->pool_id = NULL;
       
   810   priv->id = NULL;
       
   811   GST_OBJECT_UNLOCK (task);
       
   812 
       
   813   if (pool) {
       
   814     if (id)
       
   815       gst_task_pool_join (pool, id);
       
   816     gst_object_unref (pool);
       
   817   }
       
   818 
   602 
   819   GST_DEBUG_OBJECT (task, "Joined task %p", task);
   603   GST_DEBUG_OBJECT (task, "Joined task %p", task);
   820 
   604 
   821   return TRUE;
   605   return TRUE;
   822 
   606