|
1 /* GStreamer |
|
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> |
|
3 * 2005 Wim Taymans <wim@fluendo.com> |
|
4 * |
|
5 * gsttask.c: Streaming tasks |
|
6 * |
|
7 * This library is free software; you can redistribute it and/or |
|
8 * modify it under the terms of the GNU Library General Public |
|
9 * License as published by the Free Software Foundation; either |
|
10 * version 2 of the License, or (at your option) any later version. |
|
11 * |
|
12 * This library is distributed in the hope that it will be useful, |
|
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
15 * Library General Public License for more details. |
|
16 * |
|
17 * You should have received a copy of the GNU Library General Public |
|
18 * License along with this library; if not, write to the |
|
19 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
|
20 * Boston, MA 02111-1307, USA. |
|
21 */ |
|
22 |
|
23 /** |
|
24 * SECTION:gsttask |
|
25 * @short_description: Abstraction of GStreamer streaming threads. |
|
26 * @see_also: #GstElement, #GstPad |
|
27 * |
|
28 * #GstTask is used by #GstElement and #GstPad to provide the data passing |
|
29 * threads in a #GstPipeline. |
|
30 * |
|
31 * A #GstPad will typically start a #GstTask to push or pull data to/from the |
|
32 * peer pads. Most source elements start a #GstTask to push data. In some cases |
|
33 * a demuxer element can start a #GstTask to pull data from a peer element. This |
|
34 * is typically done when the demuxer can perform random access on the upstream |
|
35 * peer element for improved performance. |
|
36 * |
|
37 * Although convenience functions exist on #GstPad to start/pause/stop tasks, it |
|
38 * might sometimes be needed to create a #GstTask manually if it is not related to |
|
39 * a #GstPad. |
|
40 * |
|
41 * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with |
|
42 * gst_task_set_lock(). |
|
43 * |
|
44 * The task can be started, paused and stopped with gst_task_start(), gst_task_pause() |
|
45 * and gst_task_stop() respectively. |
|
46 * |
|
47 * A #GstTask will repeadedly call the #GstTaskFunction with the user data |
|
48 * that was provided when creating the task with gst_task_create(). Before calling |
|
49 * the function it will acquire the provided lock. |
|
50 * |
|
51 * Stopping a task with gst_task_stop() will not immediatly make sure the task is |
|
52 * not running anymore. Use gst_task_join() to make sure the task is completely |
|
53 * stopped and the thread is stopped. |
|
54 * |
|
55 * After creating a #GstTask, use gst_object_unref() to free its resources. This can |
|
56 * only be done it the task is not running anymore. |
|
57 * |
|
58 * Last reviewed on 2006-02-13 (0.10.4) |
|
59 */ |
|
60 |
|
61 #include "gst_private.h" |
|
62 |
|
63 #include "gstinfo.h" |
|
64 #include "gsttask.h" |
|
65 |
|
66 #ifdef __SYMBIAN32__ |
|
67 #include <glib_global.h> |
|
68 #endif |
|
69 |
|
70 GST_DEBUG_CATEGORY_STATIC (task_debug); |
|
71 #define GST_CAT_DEFAULT (task_debug) |
|
72 |
|
73 static void gst_task_class_init (GstTaskClass * klass); |
|
74 static void gst_task_init (GstTask * task); |
|
75 static void gst_task_finalize (GObject * object); |
|
76 |
|
77 static void gst_task_func (GstTask * task, GstTaskClass * tclass); |
|
78 |
|
79 static GstObjectClass *parent_class = NULL; |
|
80 |
|
81 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; |
|
82 #ifdef __SYMBIAN32__ |
|
83 EXPORT_C |
|
84 #endif |
|
85 |
|
86 |
|
87 GType |
|
88 gst_task_get_type (void) |
|
89 { |
|
90 static GType _gst_task_type = 0; |
|
91 |
|
92 if (G_UNLIKELY (_gst_task_type == 0)) { |
|
93 static const GTypeInfo task_info = { |
|
94 sizeof (GstTaskClass), |
|
95 NULL, |
|
96 NULL, |
|
97 (GClassInitFunc) gst_task_class_init, |
|
98 NULL, |
|
99 NULL, |
|
100 sizeof (GstTask), |
|
101 0, |
|
102 (GInstanceInitFunc) gst_task_init, |
|
103 NULL |
|
104 }; |
|
105 |
|
106 _gst_task_type = |
|
107 g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0); |
|
108 |
|
109 GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); |
|
110 } |
|
111 return _gst_task_type; |
|
112 } |
|
113 |
|
114 static void |
|
115 gst_task_class_init (GstTaskClass * klass) |
|
116 { |
|
117 GObjectClass *gobject_class; |
|
118 |
|
119 gobject_class = (GObjectClass *) klass; |
|
120 |
|
121 parent_class = g_type_class_peek_parent (klass); |
|
122 |
|
123 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize); |
|
124 |
|
125 klass->pool = g_thread_pool_new ( |
|
126 (GFunc) gst_task_func, klass, -1, FALSE, NULL); |
|
127 } |
|
128 |
|
129 static void |
|
130 gst_task_init (GstTask * task) |
|
131 { |
|
132 task->running = FALSE; |
|
133 task->abidata.ABI.thread = NULL; |
|
134 task->lock = NULL; |
|
135 task->cond = g_cond_new (); |
|
136 task->state = GST_TASK_STOPPED; |
|
137 } |
|
138 |
|
139 static void |
|
140 gst_task_finalize (GObject * object) |
|
141 { |
|
142 GstTask *task = GST_TASK (object); |
|
143 |
|
144 GST_DEBUG ("task %p finalize", task); |
|
145 |
|
146 /* task thread cannot be running here since it holds a ref |
|
147 * to the task so that the finalize could not have happened */ |
|
148 g_cond_free (task->cond); |
|
149 task->cond = NULL; |
|
150 |
|
151 G_OBJECT_CLASS (parent_class)->finalize (object); |
|
152 } |
|
153 |
|
154 static void |
|
155 gst_task_func (GstTask * task, GstTaskClass * tclass) |
|
156 { |
|
157 GStaticRecMutex *lock; |
|
158 GThread *tself; |
|
159 |
|
160 tself = g_thread_self (); |
|
161 |
|
162 GST_DEBUG ("Entering task %p, thread %p", task, tself); |
|
163 |
|
164 /* we have to grab the lock to get the mutex. We also |
|
165 * mark our state running so that nobody can mess with |
|
166 * the mutex. */ |
|
167 GST_OBJECT_LOCK (task); |
|
168 if (task->state == GST_TASK_STOPPED) |
|
169 goto exit; |
|
170 lock = GST_TASK_GET_LOCK (task); |
|
171 if (G_UNLIKELY (lock == NULL)) |
|
172 goto no_lock; |
|
173 task->abidata.ABI.thread = tself; |
|
174 GST_OBJECT_UNLOCK (task); |
|
175 |
|
176 /* locking order is TASK_LOCK, LOCK */ |
|
177 g_static_rec_mutex_lock (lock); |
|
178 GST_OBJECT_LOCK (task); |
|
179 while (G_LIKELY (task->state != GST_TASK_STOPPED)) { |
|
180 while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) { |
|
181 gint t; |
|
182 |
|
183 t = g_static_rec_mutex_unlock_full (lock); |
|
184 if (t <= 0) { |
|
185 g_warning ("wrong STREAM_LOCK count %d", t); |
|
186 } |
|
187 GST_TASK_SIGNAL (task); |
|
188 GST_TASK_WAIT (task); |
|
189 GST_OBJECT_UNLOCK (task); |
|
190 /* locking order.. */ |
|
191 if (t > 0) |
|
192 g_static_rec_mutex_lock_full (lock, t); |
|
193 |
|
194 GST_OBJECT_LOCK (task); |
|
195 if (G_UNLIKELY (task->state == GST_TASK_STOPPED)) |
|
196 goto done; |
|
197 } |
|
198 GST_OBJECT_UNLOCK (task); |
|
199 |
|
200 task->func (task->data); |
|
201 |
|
202 GST_OBJECT_LOCK (task); |
|
203 } |
|
204 done: |
|
205 GST_OBJECT_UNLOCK (task); |
|
206 g_static_rec_mutex_unlock (lock); |
|
207 |
|
208 GST_OBJECT_LOCK (task); |
|
209 task->abidata.ABI.thread = NULL; |
|
210 |
|
211 exit: |
|
212 /* now we allow messing with the lock again by setting the running flag to |
|
213 * FALSE. Together with the SIGNAL this is the sign for the _join() to |
|
214 * complete. |
|
215 * Note that we still have not dropped the final ref on the task. We could |
|
216 * check here if there is a pending join() going on and drop the last ref |
|
217 * before releasing the lock as we can be sure that a ref is held by the |
|
218 * caller of the join(). */ |
|
219 task->running = FALSE; |
|
220 GST_TASK_SIGNAL (task); |
|
221 GST_OBJECT_UNLOCK (task); |
|
222 |
|
223 GST_DEBUG ("Exit task %p, thread %p", task, g_thread_self ()); |
|
224 |
|
225 gst_object_unref (task); |
|
226 return; |
|
227 |
|
228 no_lock: |
|
229 { |
|
230 g_warning ("starting task without a lock"); |
|
231 goto exit; |
|
232 } |
|
233 } |
|
234 |
|
235 /** |
|
236 * gst_task_cleanup_all: |
|
237 * |
|
238 * Wait for all tasks to be stopped. This is mainly used internally |
|
239 * to ensure proper cleanup of internal datastructures in testsuites. |
|
240 * |
|
241 * MT safe. |
|
242 */ |
|
243 #ifdef __SYMBIAN32__ |
|
244 EXPORT_C |
|
245 #endif |
|
246 |
|
247 void |
|
248 gst_task_cleanup_all (void) |
|
249 { |
|
250 GstTaskClass *klass; |
|
251 |
|
252 if ((klass = g_type_class_peek (GST_TYPE_TASK))) { |
|
253 g_static_mutex_lock (&pool_lock); |
|
254 if (klass->pool) { |
|
255 /* Shut down all the threads, we still process the ones scheduled |
|
256 * because the unref happens in the thread function. |
|
257 * Also wait for currently running ones to finish. */ |
|
258 g_thread_pool_free (klass->pool, FALSE, TRUE); |
|
259 /* create new pool, so we can still do something after this |
|
260 * call. */ |
|
261 klass->pool = g_thread_pool_new ( |
|
262 (GFunc) gst_task_func, klass, -1, FALSE, NULL); |
|
263 } |
|
264 g_static_mutex_unlock (&pool_lock); |
|
265 } |
|
266 } |
|
267 |
|
268 /** |
|
269 * gst_task_create: |
|
270 * @func: The #GstTaskFunction to use |
|
271 * @data: User data to pass to @func |
|
272 * |
|
273 * Create a new Task that will repeadedly call the provided @func |
|
274 * with @data as a parameter. Typically the task will run in |
|
275 * a new thread. |
|
276 * |
|
277 * The function cannot be changed after the task has been created. You |
|
278 * must create a new GstTask to change the function. |
|
279 * |
|
280 * Returns: A new #GstTask. |
|
281 * |
|
282 * MT safe. |
|
283 */ |
|
284 #ifdef __SYMBIAN32__ |
|
285 EXPORT_C |
|
286 #endif |
|
287 |
|
288 GstTask * |
|
289 gst_task_create (GstTaskFunction func, gpointer data) |
|
290 { |
|
291 GstTask *task; |
|
292 |
|
293 task = g_object_new (GST_TYPE_TASK, NULL); |
|
294 task->func = func; |
|
295 task->data = data; |
|
296 |
|
297 GST_DEBUG ("Created task %p", task); |
|
298 |
|
299 return task; |
|
300 } |
|
301 |
|
302 /** |
|
303 * gst_task_set_lock: |
|
304 * @task: The #GstTask to use |
|
305 * @mutex: The GMutex to use |
|
306 * |
|
307 * Set the mutex used by the task. The mutex will be acquired before |
|
308 * calling the #GstTaskFunction. |
|
309 * |
|
310 * This function has to be called before calling gst_task_pause() or |
|
311 * gst_task_start(). |
|
312 * |
|
313 * MT safe. |
|
314 */ |
|
315 #ifdef __SYMBIAN32__ |
|
316 EXPORT_C |
|
317 #endif |
|
318 |
|
319 void |
|
320 gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex) |
|
321 { |
|
322 GST_OBJECT_LOCK (task); |
|
323 if (G_UNLIKELY (task->running)) |
|
324 goto is_running; |
|
325 GST_TASK_GET_LOCK (task) = mutex; |
|
326 GST_OBJECT_UNLOCK (task); |
|
327 |
|
328 return; |
|
329 |
|
330 /* ERRORS */ |
|
331 is_running: |
|
332 { |
|
333 GST_OBJECT_UNLOCK (task); |
|
334 g_warning ("cannot call set_lock on a running task"); |
|
335 } |
|
336 } |
|
337 |
|
338 |
|
339 /** |
|
340 * gst_task_get_state: |
|
341 * @task: The #GstTask to query |
|
342 * |
|
343 * Get the current state of the task. |
|
344 * |
|
345 * Returns: The #GstTaskState of the task |
|
346 * |
|
347 * MT safe. |
|
348 */ |
|
349 #ifdef __SYMBIAN32__ |
|
350 EXPORT_C |
|
351 #endif |
|
352 |
|
353 GstTaskState |
|
354 gst_task_get_state (GstTask * task) |
|
355 { |
|
356 GstTaskState result; |
|
357 |
|
358 g_return_val_if_fail (GST_IS_TASK (task), GST_TASK_STOPPED); |
|
359 |
|
360 GST_OBJECT_LOCK (task); |
|
361 result = task->state; |
|
362 GST_OBJECT_UNLOCK (task); |
|
363 |
|
364 return result; |
|
365 } |
|
366 |
|
367 /** |
|
368 * gst_task_start: |
|
369 * @task: The #GstTask to start |
|
370 * |
|
371 * Starts @task. The @task must have a lock associated with it using |
|
372 * gst_task_set_lock() or thsi function will return FALSE. |
|
373 * |
|
374 * Returns: TRUE if the task could be started. |
|
375 * |
|
376 * MT safe. |
|
377 */ |
|
378 #ifdef __SYMBIAN32__ |
|
379 EXPORT_C |
|
380 #endif |
|
381 |
|
382 gboolean |
|
383 gst_task_start (GstTask * task) |
|
384 { |
|
385 GstTaskState old; |
|
386 |
|
387 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
|
388 |
|
389 GST_DEBUG_OBJECT (task, "Starting task %p", task); |
|
390 |
|
391 GST_OBJECT_LOCK (task); |
|
392 if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) |
|
393 goto no_lock; |
|
394 |
|
395 old = task->state; |
|
396 task->state = GST_TASK_STARTED; |
|
397 switch (old) { |
|
398 case GST_TASK_STOPPED: |
|
399 { |
|
400 GstTaskClass *tclass; |
|
401 |
|
402 /* If the task already has a thread scheduled we don't have to do |
|
403 * anything. */ |
|
404 if (task->running) |
|
405 break; |
|
406 |
|
407 /* new task, push on threadpool. We ref before so |
|
408 * that it remains alive while on the threadpool. */ |
|
409 gst_object_ref (task); |
|
410 /* mark task as running so that a join will wait until we schedule |
|
411 * and exit the task function. */ |
|
412 task->running = TRUE; |
|
413 |
|
414 tclass = GST_TASK_GET_CLASS (task); |
|
415 |
|
416 g_static_mutex_lock (&pool_lock); |
|
417 g_thread_pool_push (tclass->pool, task, NULL); |
|
418 g_static_mutex_unlock (&pool_lock); |
|
419 break; |
|
420 } |
|
421 case GST_TASK_PAUSED: |
|
422 /* PAUSE to PLAY, signal */ |
|
423 GST_TASK_SIGNAL (task); |
|
424 break; |
|
425 case GST_TASK_STARTED: |
|
426 /* was OK */ |
|
427 break; |
|
428 } |
|
429 GST_OBJECT_UNLOCK (task); |
|
430 |
|
431 return TRUE; |
|
432 |
|
433 /* ERRORS */ |
|
434 no_lock: |
|
435 { |
|
436 GST_WARNING_OBJECT (task, "starting task without a lock"); |
|
437 GST_OBJECT_UNLOCK (task); |
|
438 g_warning ("starting task without a lock"); |
|
439 return FALSE; |
|
440 } |
|
441 } |
|
442 |
|
443 /** |
|
444 * gst_task_stop: |
|
445 * @task: The #GstTask to stop |
|
446 * |
|
447 * Stops @task. This method merely schedules the task to stop and |
|
448 * will not wait for the task to have completely stopped. Use |
|
449 * gst_task_join() to stop and wait for completion. |
|
450 * |
|
451 * Returns: TRUE if the task could be stopped. |
|
452 * |
|
453 * MT safe. |
|
454 */ |
|
455 #ifdef __SYMBIAN32__ |
|
456 EXPORT_C |
|
457 #endif |
|
458 |
|
459 gboolean |
|
460 gst_task_stop (GstTask * task) |
|
461 { |
|
462 GstTaskClass *tclass; |
|
463 GstTaskState old; |
|
464 |
|
465 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
|
466 |
|
467 tclass = GST_TASK_GET_CLASS (task); |
|
468 |
|
469 GST_DEBUG_OBJECT (task, "Stopping task %p", task); |
|
470 |
|
471 GST_OBJECT_LOCK (task); |
|
472 old = task->state; |
|
473 task->state = GST_TASK_STOPPED; |
|
474 switch (old) { |
|
475 case GST_TASK_STOPPED: |
|
476 break; |
|
477 case GST_TASK_PAUSED: |
|
478 GST_TASK_SIGNAL (task); |
|
479 break; |
|
480 case GST_TASK_STARTED: |
|
481 break; |
|
482 } |
|
483 GST_OBJECT_UNLOCK (task); |
|
484 |
|
485 return TRUE; |
|
486 } |
|
487 |
|
488 /** |
|
489 * gst_task_pause: |
|
490 * @task: The #GstTask to pause |
|
491 * |
|
492 * Pauses @task. This method can also be called on a task in the |
|
493 * stopped state, in which case a thread will be started and will remain |
|
494 * in the paused state. This function does not wait for the task to complete |
|
495 * the paused state. |
|
496 * |
|
497 * Returns: TRUE if the task could be paused. |
|
498 * |
|
499 * MT safe. |
|
500 */ |
|
501 #ifdef __SYMBIAN32__ |
|
502 EXPORT_C |
|
503 #endif |
|
504 gboolean |
|
505 gst_task_pause (GstTask * task) |
|
506 { |
|
507 GstTaskState old; |
|
508 |
|
509 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
|
510 |
|
511 GST_DEBUG_OBJECT (task, "Pausing task %p", task); |
|
512 |
|
513 GST_OBJECT_LOCK (task); |
|
514 if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) |
|
515 goto no_lock; |
|
516 |
|
517 old = task->state; |
|
518 task->state = GST_TASK_PAUSED; |
|
519 switch (old) { |
|
520 case GST_TASK_STOPPED: |
|
521 { |
|
522 GstTaskClass *tclass; |
|
523 |
|
524 if (task->running) |
|
525 break; |
|
526 |
|
527 gst_object_ref (task); |
|
528 task->running = TRUE; |
|
529 |
|
530 tclass = GST_TASK_GET_CLASS (task); |
|
531 |
|
532 g_static_mutex_lock (&pool_lock); |
|
533 g_thread_pool_push (tclass->pool, task, NULL); |
|
534 g_static_mutex_unlock (&pool_lock); |
|
535 break; |
|
536 } |
|
537 case GST_TASK_PAUSED: |
|
538 break; |
|
539 case GST_TASK_STARTED: |
|
540 break; |
|
541 } |
|
542 GST_OBJECT_UNLOCK (task); |
|
543 |
|
544 return TRUE; |
|
545 |
|
546 /* ERRORS */ |
|
547 no_lock: |
|
548 { |
|
549 GST_WARNING_OBJECT (task, "pausing task without a lock"); |
|
550 GST_OBJECT_UNLOCK (task); |
|
551 g_warning ("pausing task without a lock"); |
|
552 return FALSE; |
|
553 } |
|
554 } |
|
555 |
|
556 /** |
|
557 * gst_task_join: |
|
558 * @task: The #GstTask to join |
|
559 * |
|
560 * Joins @task. After this call, it is safe to unref the task |
|
561 * and clean up the lock set with gst_task_set_lock(). |
|
562 * |
|
563 * The task will automatically be stopped with this call. |
|
564 * |
|
565 * This function cannot be called from within a task function as this |
|
566 * would cause a deadlock. The function will detect this and print a |
|
567 * g_warning. |
|
568 * |
|
569 * Returns: TRUE if the task could be joined. |
|
570 * |
|
571 * MT safe. |
|
572 */ |
|
573 #ifdef __SYMBIAN32__ |
|
574 EXPORT_C |
|
575 #endif |
|
576 |
|
577 gboolean |
|
578 gst_task_join (GstTask * task) |
|
579 { |
|
580 GThread *tself; |
|
581 |
|
582 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
|
583 |
|
584 tself = g_thread_self (); |
|
585 |
|
586 GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself); |
|
587 |
|
588 /* we don't use a real thread join here because we are using |
|
589 * threadpools */ |
|
590 GST_OBJECT_LOCK (task); |
|
591 if (G_UNLIKELY (tself == task->abidata.ABI.thread)) |
|
592 goto joining_self; |
|
593 task->state = GST_TASK_STOPPED; |
|
594 /* signal the state change for when it was blocked in PAUSED. */ |
|
595 GST_TASK_SIGNAL (task); |
|
596 /* we set the running flag when pushing the task on the threadpool. |
|
597 * This means that the task function might not be called when we try |
|
598 * to join it here. */ |
|
599 while (G_LIKELY (task->running)) |
|
600 GST_TASK_WAIT (task); |
|
601 GST_OBJECT_UNLOCK (task); |
|
602 |
|
603 GST_DEBUG_OBJECT (task, "Joined task %p", task); |
|
604 |
|
605 return TRUE; |
|
606 |
|
607 /* ERRORS */ |
|
608 joining_self: |
|
609 { |
|
610 GST_WARNING_OBJECT (task, "trying to join task from its thread"); |
|
611 GST_OBJECT_UNLOCK (task); |
|
612 g_warning ("\nTrying to join task %p from its thread would deadlock.\n" |
|
613 "You cannot change the state of an element from its streaming\n" |
|
614 "thread. Use g_idle_add() or post a GstMessage on the bus to\n" |
|
615 "schedule the state change from the main thread.\n", task); |
|
616 return FALSE; |
|
617 } |
|
618 } |