gstreamer_core/gst/gsttask.c
changeset 0 0e761a78d257
child 7 567bb019e3e3
equal deleted inserted replaced
-1:000000000000 0:0e761a78d257
       
     1 /* GStreamer
       
     2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
       
     3  *                    2005 Wim Taymans <wim@fluendo.com>
       
     4  *
       
     5  * gsttask.c: Streaming tasks
       
     6  *
       
     7  * This library is free software; you can redistribute it and/or
       
     8  * modify it under the terms of the GNU Library General Public
       
     9  * License as published by the Free Software Foundation; either
       
    10  * version 2 of the License, or (at your option) any later version.
       
    11  *
       
    12  * This library is distributed in the hope that it will be useful,
       
    13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    15  * Library General Public License for more details.
       
    16  *
       
    17  * You should have received a copy of the GNU Library General Public
       
    18  * License along with this library; if not, write to the
       
    19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       
    20  * Boston, MA 02111-1307, USA.
       
    21  */
       
    22 
       
    23 /**
       
    24  * SECTION:gsttask
       
    25  * @short_description: Abstraction of GStreamer streaming threads.
       
    26  * @see_also: #GstElement, #GstPad
       
    27  *
       
    28  * #GstTask is used by #GstElement and #GstPad to provide the data passing
       
    29  * threads in a #GstPipeline.
       
    30  *
       
    31  * A #GstPad will typically start a #GstTask to push or pull data to/from the
       
    32  * peer pads. Most source elements start a #GstTask to push data. In some cases
       
    33  * a demuxer element can start a #GstTask to pull data from a peer element. This
       
    34  * is typically done when the demuxer can perform random access on the upstream
       
    35  * peer element for improved performance.
       
    36  *
       
    37  * Although convenience functions exist on #GstPad to start/pause/stop tasks, it 
       
    38  * might sometimes be needed to create a #GstTask manually if it is not related to
       
    39  * a #GstPad.
       
    40  *
       
    41  * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with
       
    42  * gst_task_set_lock().
       
    43  *
       
    44  * The task can be started, paused and stopped with gst_task_start(), gst_task_pause()
       
    45  * and gst_task_stop() respectively.
       
    46  *
       
    47  * A #GstTask will repeadedly call the #GstTaskFunction with the user data
       
    48  * that was provided when creating the task with gst_task_create(). Before calling
       
    49  * the function it will acquire the provided lock.
       
    50  *
       
    51  * Stopping a task with gst_task_stop() will not immediatly make sure the task is
       
    52  * not running anymore. Use gst_task_join() to make sure the task is completely
       
    53  * stopped and the thread is stopped.
       
    54  *
       
    55  * After creating a #GstTask, use gst_object_unref() to free its resources. This can
       
    56  * only be done it the task is not running anymore.
       
    57  *
       
    58  * Last reviewed on 2006-02-13 (0.10.4)
       
    59  */
       
    60 
       
    61 #include "gst_private.h"
       
    62 
       
    63 #include "gstinfo.h"
       
    64 #include "gsttask.h"
       
    65 
       
    66 #ifdef __SYMBIAN32__
       
    67 #include <glib_global.h>
       
    68 #endif
       
    69 
       
    70 GST_DEBUG_CATEGORY_STATIC (task_debug);
       
    71 #define GST_CAT_DEFAULT (task_debug)
       
    72 
       
    73 static void gst_task_class_init (GstTaskClass * klass);
       
    74 static void gst_task_init (GstTask * task);
       
    75 static void gst_task_finalize (GObject * object);
       
    76 
       
    77 static void gst_task_func (GstTask * task, GstTaskClass * tclass);
       
    78 
       
    79 static GstObjectClass *parent_class = NULL;
       
    80 
       
    81 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
       
    82 #ifdef __SYMBIAN32__
       
    83 EXPORT_C
       
    84 #endif
       
    85 
       
    86 
       
    87 GType
       
    88 gst_task_get_type (void)
       
    89 {
       
    90   static GType _gst_task_type = 0;
       
    91 
       
    92   if (G_UNLIKELY (_gst_task_type == 0)) {
       
    93     static const GTypeInfo task_info = {
       
    94       sizeof (GstTaskClass),
       
    95       NULL,
       
    96       NULL,
       
    97       (GClassInitFunc) gst_task_class_init,
       
    98       NULL,
       
    99       NULL,
       
   100       sizeof (GstTask),
       
   101       0,
       
   102       (GInstanceInitFunc) gst_task_init,
       
   103       NULL
       
   104     };
       
   105 
       
   106     _gst_task_type =
       
   107         g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0);
       
   108 
       
   109     GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks");
       
   110   }
       
   111   return _gst_task_type;
       
   112 }
       
   113 
       
   114 static void
       
   115 gst_task_class_init (GstTaskClass * klass)
       
   116 {
       
   117   GObjectClass *gobject_class;
       
   118 
       
   119   gobject_class = (GObjectClass *) klass;
       
   120 
       
   121   parent_class = g_type_class_peek_parent (klass);
       
   122 
       
   123   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize);
       
   124 
       
   125   klass->pool = g_thread_pool_new (
       
   126       (GFunc) gst_task_func, klass, -1, FALSE, NULL);
       
   127 }
       
   128 
       
   129 static void
       
   130 gst_task_init (GstTask * task)
       
   131 {
       
   132   task->running = FALSE;
       
   133   task->abidata.ABI.thread = NULL;
       
   134   task->lock = NULL;
       
   135   task->cond = g_cond_new ();
       
   136   task->state = GST_TASK_STOPPED;
       
   137 }
       
   138 
       
   139 static void
       
   140 gst_task_finalize (GObject * object)
       
   141 {
       
   142   GstTask *task = GST_TASK (object);
       
   143 
       
   144   GST_DEBUG ("task %p finalize", task);
       
   145 
       
   146   /* task thread cannot be running here since it holds a ref
       
   147    * to the task so that the finalize could not have happened */
       
   148   g_cond_free (task->cond);
       
   149   task->cond = NULL;
       
   150 
       
   151   G_OBJECT_CLASS (parent_class)->finalize (object);
       
   152 }
       
   153 
       
   154 static void
       
   155 gst_task_func (GstTask * task, GstTaskClass * tclass)
       
   156 {
       
   157   GStaticRecMutex *lock;
       
   158   GThread *tself;
       
   159 
       
   160   tself = g_thread_self ();
       
   161 
       
   162   GST_DEBUG ("Entering task %p, thread %p", task, tself);
       
   163 
       
   164   /* we have to grab the lock to get the mutex. We also
       
   165    * mark our state running so that nobody can mess with
       
   166    * the mutex. */
       
   167   GST_OBJECT_LOCK (task);
       
   168   if (task->state == GST_TASK_STOPPED)
       
   169     goto exit;
       
   170   lock = GST_TASK_GET_LOCK (task);
       
   171   if (G_UNLIKELY (lock == NULL))
       
   172     goto no_lock;
       
   173   task->abidata.ABI.thread = tself;
       
   174   GST_OBJECT_UNLOCK (task);
       
   175 
       
   176   /* locking order is TASK_LOCK, LOCK */
       
   177   g_static_rec_mutex_lock (lock);
       
   178   GST_OBJECT_LOCK (task);
       
   179   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
       
   180     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
       
   181       gint t;
       
   182 
       
   183       t = g_static_rec_mutex_unlock_full (lock);
       
   184       if (t <= 0) {
       
   185         g_warning ("wrong STREAM_LOCK count %d", t);
       
   186       }
       
   187       GST_TASK_SIGNAL (task);
       
   188       GST_TASK_WAIT (task);
       
   189       GST_OBJECT_UNLOCK (task);
       
   190       /* locking order.. */
       
   191       if (t > 0)
       
   192         g_static_rec_mutex_lock_full (lock, t);
       
   193 
       
   194       GST_OBJECT_LOCK (task);
       
   195       if (G_UNLIKELY (task->state == GST_TASK_STOPPED))
       
   196         goto done;
       
   197     }
       
   198     GST_OBJECT_UNLOCK (task);
       
   199 
       
   200     task->func (task->data);
       
   201 
       
   202     GST_OBJECT_LOCK (task);
       
   203   }
       
   204 done:
       
   205   GST_OBJECT_UNLOCK (task);
       
   206   g_static_rec_mutex_unlock (lock);
       
   207 
       
   208   GST_OBJECT_LOCK (task);
       
   209   task->abidata.ABI.thread = NULL;
       
   210 
       
   211 exit:
       
   212   /* now we allow messing with the lock again by setting the running flag to
       
   213    * FALSE. Together with the SIGNAL this is the sign for the _join() to 
       
   214    * complete. 
       
   215    * Note that we still have not dropped the final ref on the task. We could
       
   216    * check here if there is a pending join() going on and drop the last ref
       
   217    * before releasing the lock as we can be sure that a ref is held by the
       
   218    * caller of the join(). */
       
   219   task->running = FALSE;
       
   220   GST_TASK_SIGNAL (task);
       
   221   GST_OBJECT_UNLOCK (task);
       
   222 
       
   223   GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ());
       
   224 
       
   225   gst_object_unref (task);
       
   226   return;
       
   227 
       
   228 no_lock:
       
   229   {
       
   230     g_warning ("starting task without a lock");
       
   231     goto exit;
       
   232   }
       
   233 }
       
   234 
       
   235 /**
       
   236  * gst_task_cleanup_all:
       
   237  *
       
   238  * Wait for all tasks to be stopped. This is mainly used internally
       
   239  * to ensure proper cleanup of internal datastructures in testsuites.
       
   240  *
       
   241  * MT safe.
       
   242  */
       
   243 #ifdef __SYMBIAN32__
       
   244 EXPORT_C
       
   245 #endif
       
   246 
       
   247 void
       
   248 gst_task_cleanup_all (void)
       
   249 {
       
   250   GstTaskClass *klass;
       
   251 
       
   252   if ((klass = g_type_class_peek (GST_TYPE_TASK))) {
       
   253     g_static_mutex_lock (&pool_lock);
       
   254     if (klass->pool) {
       
   255       /* Shut down all the threads, we still process the ones scheduled
       
   256        * because the unref happens in the thread function.
       
   257        * Also wait for currently running ones to finish. */
       
   258       g_thread_pool_free (klass->pool, FALSE, TRUE);
       
   259       /* create new pool, so we can still do something after this
       
   260        * call. */
       
   261       klass->pool = g_thread_pool_new (
       
   262           (GFunc) gst_task_func, klass, -1, FALSE, NULL);
       
   263     }
       
   264     g_static_mutex_unlock (&pool_lock);
       
   265   }
       
   266 }
       
   267 
       
   268 /**
       
   269  * gst_task_create:
       
   270  * @func: The #GstTaskFunction to use
       
   271  * @data: User data to pass to @func
       
   272  *
       
   273  * Create a new Task that will repeadedly call the provided @func
       
   274  * with @data as a parameter. Typically the task will run in
       
   275  * a new thread.
       
   276  *
       
   277  * The function cannot be changed after the task has been created. You
       
   278  * must create a new GstTask to change the function.
       
   279  *
       
   280  * Returns: A new #GstTask.
       
   281  *
       
   282  * MT safe.
       
   283  */
       
   284 #ifdef __SYMBIAN32__
       
   285 EXPORT_C
       
   286 #endif
       
   287 
       
   288 GstTask *
       
   289 gst_task_create (GstTaskFunction func, gpointer data)
       
   290 {
       
   291   GstTask *task;
       
   292 
       
   293   task = g_object_new (GST_TYPE_TASK, NULL);
       
   294   task->func = func;
       
   295   task->data = data;
       
   296 
       
   297   GST_DEBUG ("Created task %p", task);
       
   298 
       
   299   return task;
       
   300 }
       
   301 
       
   302 /**
       
   303  * gst_task_set_lock:
       
   304  * @task: The #GstTask to use
       
   305  * @mutex: The GMutex to use
       
   306  *
       
   307  * Set the mutex used by the task. The mutex will be acquired before
       
   308  * calling the #GstTaskFunction.
       
   309  *
       
   310  * This function has to be called before calling gst_task_pause() or
       
   311  * gst_task_start().
       
   312  *
       
   313  * MT safe.
       
   314  */
       
   315 #ifdef __SYMBIAN32__
       
   316 EXPORT_C
       
   317 #endif
       
   318 
       
   319 void
       
   320 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
       
   321 {
       
   322   GST_OBJECT_LOCK (task);
       
   323   if (G_UNLIKELY (task->running))
       
   324     goto is_running;
       
   325   GST_TASK_GET_LOCK (task) = mutex;
       
   326   GST_OBJECT_UNLOCK (task);
       
   327 
       
   328   return;
       
   329 
       
   330   /* ERRORS */
       
   331 is_running:
       
   332   {
       
   333     GST_OBJECT_UNLOCK (task);
       
   334     g_warning ("cannot call set_lock on a running task");
       
   335   }
       
   336 }
       
   337 
       
   338 
       
   339 /**
       
   340  * gst_task_get_state:
       
   341  * @task: The #GstTask to query
       
   342  *
       
   343  * Get the current state of the task.
       
   344  *
       
   345  * Returns: The #GstTaskState of the task
       
   346  *
       
   347  * MT safe.
       
   348  */
       
   349 #ifdef __SYMBIAN32__
       
   350 EXPORT_C
       
   351 #endif
       
   352 
       
   353 GstTaskState
       
   354 gst_task_get_state (GstTask * task)
       
   355 {
       
   356   GstTaskState result;
       
   357 
       
   358   g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED);
       
   359 
       
   360   GST_OBJECT_LOCK (task);
       
   361   result = task->state;
       
   362   GST_OBJECT_UNLOCK (task);
       
   363 
       
   364   return result;
       
   365 }
       
   366 
       
   367 /**
       
   368  * gst_task_start:
       
   369  * @task: The #GstTask to start
       
   370  *
       
   371  * Starts @task. The @task must have a lock associated with it using
       
   372  * gst_task_set_lock() or thsi function will return FALSE.
       
   373  *
       
   374  * Returns: TRUE if the task could be started.
       
   375  *
       
   376  * MT safe.
       
   377  */
       
   378 #ifdef __SYMBIAN32__
       
   379 EXPORT_C
       
   380 #endif
       
   381 
       
   382 gboolean
       
   383 gst_task_start (GstTask * task)
       
   384 {
       
   385   GstTaskState old;
       
   386 
       
   387   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
       
   388 
       
   389   GST_DEBUG_OBJECT (task, "Starting task %p", task);
       
   390 
       
   391   GST_OBJECT_LOCK (task);
       
   392   if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
       
   393     goto no_lock;
       
   394 
       
   395   old = task->state;
       
   396   task->state = GST_TASK_STARTED;
       
   397   switch (old) {
       
   398     case GST_TASK_STOPPED:
       
   399     {
       
   400       GstTaskClass *tclass;
       
   401 
       
   402       /* If the task already has a thread scheduled we don't have to do
       
   403        * anything. */
       
   404       if (task->running)
       
   405         break;
       
   406 
       
   407       /* new task, push on threadpool. We ref before so
       
   408        * that it remains alive while on the threadpool. */
       
   409       gst_object_ref (task);
       
   410       /* mark task as running so that a join will wait until we schedule
       
   411        * and exit the task function. */
       
   412       task->running = TRUE;
       
   413 
       
   414       tclass = GST_TASK_GET_CLASS (task);
       
   415 
       
   416       g_static_mutex_lock (&pool_lock);
       
   417       g_thread_pool_push (tclass->pool, task, NULL);
       
   418       g_static_mutex_unlock (&pool_lock);
       
   419       break;
       
   420     }
       
   421     case GST_TASK_PAUSED:
       
   422       /* PAUSE to PLAY, signal */
       
   423       GST_TASK_SIGNAL (task);
       
   424       break;
       
   425     case GST_TASK_STARTED:
       
   426       /* was OK */
       
   427       break;
       
   428   }
       
   429   GST_OBJECT_UNLOCK (task);
       
   430 
       
   431   return TRUE;
       
   432 
       
   433   /* ERRORS */
       
   434 no_lock:
       
   435   {
       
   436     GST_WARNING_OBJECT (task, "starting task without a lock");
       
   437     GST_OBJECT_UNLOCK (task);
       
   438     g_warning ("starting task without a lock");
       
   439     return FALSE;
       
   440   }
       
   441 }
       
   442 
       
   443 /**
       
   444  * gst_task_stop:
       
   445  * @task: The #GstTask to stop
       
   446  *
       
   447  * Stops @task. This method merely schedules the task to stop and
       
   448  * will not wait for the task to have completely stopped. Use
       
   449  * gst_task_join() to stop and wait for completion.
       
   450  *
       
   451  * Returns: TRUE if the task could be stopped.
       
   452  *
       
   453  * MT safe.
       
   454  */
       
   455 #ifdef __SYMBIAN32__
       
   456 EXPORT_C
       
   457 #endif
       
   458 
       
   459 gboolean
       
   460 gst_task_stop (GstTask * task)
       
   461 {
       
   462   GstTaskClass *tclass;
       
   463   GstTaskState old;
       
   464 
       
   465   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
       
   466 
       
   467   tclass = GST_TASK_GET_CLASS (task);
       
   468 
       
   469   GST_DEBUG_OBJECT (task, "Stopping task %p", task);
       
   470 
       
   471   GST_OBJECT_LOCK (task);
       
   472   old = task->state;
       
   473   task->state = GST_TASK_STOPPED;
       
   474   switch (old) {
       
   475     case GST_TASK_STOPPED:
       
   476       break;
       
   477     case GST_TASK_PAUSED:
       
   478       GST_TASK_SIGNAL (task);
       
   479       break;
       
   480     case GST_TASK_STARTED:
       
   481       break;
       
   482   }
       
   483   GST_OBJECT_UNLOCK (task);
       
   484 
       
   485   return TRUE;
       
   486 }
       
   487 
       
   488 /**
       
   489  * gst_task_pause:
       
   490  * @task: The #GstTask to pause
       
   491  *
       
   492  * Pauses @task. This method can also be called on a task in the
       
   493  * stopped state, in which case a thread will be started and will remain
       
   494  * in the paused state. This function does not wait for the task to complete
       
   495  * the paused state.
       
   496  *
       
   497  * Returns: TRUE if the task could be paused.
       
   498  *
       
   499  * MT safe.
       
   500  */
       
   501 #ifdef __SYMBIAN32__
       
   502 EXPORT_C
       
   503 #endif 
       
   504 gboolean
       
   505 gst_task_pause (GstTask * task)
       
   506 {
       
   507   GstTaskState old;
       
   508 
       
   509   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
       
   510 
       
   511   GST_DEBUG_OBJECT (task, "Pausing task %p", task);
       
   512 
       
   513   GST_OBJECT_LOCK (task);
       
   514   if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL))
       
   515     goto no_lock;
       
   516 
       
   517   old = task->state;
       
   518   task->state = GST_TASK_PAUSED;
       
   519   switch (old) {
       
   520     case GST_TASK_STOPPED:
       
   521     {
       
   522       GstTaskClass *tclass;
       
   523 
       
   524       if (task->running)
       
   525         break;
       
   526 
       
   527       gst_object_ref (task);
       
   528       task->running = TRUE;
       
   529 
       
   530       tclass = GST_TASK_GET_CLASS (task);
       
   531 
       
   532       g_static_mutex_lock (&pool_lock);
       
   533       g_thread_pool_push (tclass->pool, task, NULL);
       
   534       g_static_mutex_unlock (&pool_lock);
       
   535       break;
       
   536     }
       
   537     case GST_TASK_PAUSED:
       
   538       break;
       
   539     case GST_TASK_STARTED:
       
   540       break;
       
   541   }
       
   542   GST_OBJECT_UNLOCK (task);
       
   543 
       
   544   return TRUE;
       
   545 
       
   546   /* ERRORS */
       
   547 no_lock:
       
   548   {
       
   549     GST_WARNING_OBJECT (task, "pausing task without a lock");
       
   550     GST_OBJECT_UNLOCK (task);
       
   551     g_warning ("pausing task without a lock");
       
   552     return FALSE;
       
   553   }
       
   554 }
       
   555 
       
   556 /**
       
   557  * gst_task_join:
       
   558  * @task: The #GstTask to join
       
   559  *
       
   560  * Joins @task. After this call, it is safe to unref the task
       
   561  * and clean up the lock set with gst_task_set_lock().
       
   562  *
       
   563  * The task will automatically be stopped with this call.
       
   564  *
       
   565  * This function cannot be called from within a task function as this
       
   566  * would cause a deadlock. The function will detect this and print a 
       
   567  * g_warning.
       
   568  *
       
   569  * Returns: TRUE if the task could be joined.
       
   570  *
       
   571  * MT safe.
       
   572  */
       
   573 #ifdef __SYMBIAN32__
       
   574 EXPORT_C
       
   575 #endif
       
   576 
       
   577 gboolean
       
   578 gst_task_join (GstTask * task)
       
   579 {
       
   580   GThread *tself;
       
   581 
       
   582   g_return_val_if_fail (GST_IS_TASK (task), FALSE);
       
   583 
       
   584   tself = g_thread_self ();
       
   585 
       
   586   GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself);
       
   587 
       
   588   /* we don't use a real thread join here because we are using
       
   589    * threadpools */
       
   590   GST_OBJECT_LOCK (task);
       
   591   if (G_UNLIKELY (tself == task->abidata.ABI.thread))
       
   592     goto joining_self;
       
   593   task->state = GST_TASK_STOPPED;
       
   594   /* signal the state change for when it was blocked in PAUSED. */
       
   595   GST_TASK_SIGNAL (task);
       
   596   /* we set the running flag when pushing the task on the threadpool. 
       
   597    * This means that the task function might not be called when we try
       
   598    * to join it here. */
       
   599   while (G_LIKELY (task->running))
       
   600     GST_TASK_WAIT (task);
       
   601   GST_OBJECT_UNLOCK (task);
       
   602 
       
   603   GST_DEBUG_OBJECT (task, "Joined task %p", task);
       
   604 
       
   605   return TRUE;
       
   606 
       
   607   /* ERRORS */
       
   608 joining_self:
       
   609   {
       
   610     GST_WARNING_OBJECT (task, "trying to join task from its thread");
       
   611     GST_OBJECT_UNLOCK (task);
       
   612     g_warning ("\nTrying to join task %p from its thread would deadlock.\n"
       
   613         "You cannot change the state of an element from its streaming\n"
       
   614         "thread. Use g_idle_add() or post a GstMessage on the bus to\n"
       
   615         "schedule the state change from the main thread.\n", task);
       
   616     return FALSE;
       
   617   }
       
   618 }