40 * |
40 * |
41 * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with |
41 * Before the #GstTask can be run, it needs a #GStaticRecMutex that can be set with |
42 * gst_task_set_lock(). |
42 * gst_task_set_lock(). |
43 * |
43 * |
44 * The task can be started, paused and stopped with gst_task_start(), gst_task_pause() |
44 * The task can be started, paused and stopped with gst_task_start(), gst_task_pause() |
45 * and gst_task_stop() respectively. |
45 * and gst_task_stop() respectively or with the gst_task_set_state() function. |
46 * |
46 * |
47 * A #GstTask will repeadedly call the #GstTaskFunction with the user data |
47 * A #GstTask will repeatedly call the #GstTaskFunction with the user data |
48 * that was provided when creating the task with gst_task_create(). Before calling |
48 * that was provided when creating the task with gst_task_create(). While calling |
49 * the function it will acquire the provided lock. |
49 * the function it will acquire the provided lock. The provided lock is released |
50 * |
50 * when the task pauses or stops. |
51 * Stopping a task with gst_task_stop() will not immediatly make sure the task is |
51 * |
|
52 * Stopping a task with gst_task_stop() will not immediately make sure the task is |
52 * not running anymore. Use gst_task_join() to make sure the task is completely |
53 * not running anymore. Use gst_task_join() to make sure the task is completely |
53 * stopped and the thread is stopped. |
54 * stopped and the thread is stopped. |
54 * |
55 * |
55 * After creating a #GstTask, use gst_object_unref() to free its resources. This can |
56 * After creating a #GstTask, use gst_object_unref() to free its resources. This can |
56 * only be done it the task is not running anymore. |
57 * only be done it the task is not running anymore. |
67 #include <glib_global.h> |
68 #include <glib_global.h> |
68 #endif |
69 #endif |
69 |
70 |
70 GST_DEBUG_CATEGORY_STATIC (task_debug); |
71 GST_DEBUG_CATEGORY_STATIC (task_debug); |
71 #define GST_CAT_DEFAULT (task_debug) |
72 #define GST_CAT_DEFAULT (task_debug) |
|
73 |
|
74 #define GST_TASK_GET_PRIVATE(obj) \ |
|
75 (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_TASK, GstTaskPrivate)) |
|
76 |
|
77 struct _GstTaskPrivate |
|
78 { |
|
79 /* callbacks for managing the thread of this task */ |
|
80 GstTaskThreadCallbacks thr_callbacks; |
|
81 gpointer thr_user_data; |
|
82 GDestroyNotify thr_notify; |
|
83 |
|
84 gboolean prio_set; |
|
85 GThreadPriority priority; |
|
86 |
|
87 /* configured pool */ |
|
88 GstTaskPool *pool; |
|
89 |
|
90 /* remember the pool and id that is currently running. */ |
|
91 gpointer id; |
|
92 GstTaskPool *pool_id; |
|
93 }; |
72 |
94 |
73 static void gst_task_class_init (GstTaskClass * klass); |
95 static void gst_task_class_init (GstTaskClass * klass); |
74 static void gst_task_init (GstTask * task); |
96 static void gst_task_init (GstTask * task); |
75 static void gst_task_finalize (GObject * object); |
97 static void gst_task_finalize (GObject * object); |
76 |
98 |
77 static void gst_task_func (GstTask * task, GstTaskClass * tclass); |
99 static void gst_task_func (GstTask * task); |
78 |
|
79 static GstObjectClass *parent_class = NULL; |
|
80 |
100 |
81 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; |
101 static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; |
82 #ifdef __SYMBIAN32__ |
102 |
83 EXPORT_C |
103 #define _do_init \ |
84 #endif |
104 { \ |
85 |
105 GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); \ |
86 |
106 } |
87 GType |
107 |
88 gst_task_get_type (void) |
108 G_DEFINE_TYPE_WITH_CODE (GstTask, gst_task, GST_TYPE_OBJECT, _do_init); |
89 { |
109 |
90 static GType _gst_task_type = 0; |
110 static void |
91 |
111 init_klass_pool (GstTaskClass * klass) |
92 if (G_UNLIKELY (_gst_task_type == 0)) { |
112 { |
93 static const GTypeInfo task_info = { |
113 g_static_mutex_lock (&pool_lock); |
94 sizeof (GstTaskClass), |
114 if (klass->pool) { |
95 NULL, |
115 gst_task_pool_cleanup (klass->pool); |
96 NULL, |
116 gst_object_unref (klass->pool); |
97 (GClassInitFunc) gst_task_class_init, |
117 } |
98 NULL, |
118 klass->pool = gst_task_pool_new (); |
99 NULL, |
119 gst_task_pool_prepare (klass->pool, NULL); |
100 sizeof (GstTask), |
120 g_static_mutex_unlock (&pool_lock); |
101 0, |
|
102 (GInstanceInitFunc) gst_task_init, |
|
103 NULL |
|
104 }; |
|
105 |
|
106 _gst_task_type = |
|
107 g_type_register_static (GST_TYPE_OBJECT, "GstTask", &task_info, 0); |
|
108 |
|
109 GST_DEBUG_CATEGORY_INIT (task_debug, "task", 0, "Processing tasks"); |
|
110 } |
|
111 return _gst_task_type; |
|
112 } |
121 } |
113 |
122 |
114 static void |
123 static void |
115 gst_task_class_init (GstTaskClass * klass) |
124 gst_task_class_init (GstTaskClass * klass) |
116 { |
125 { |
117 GObjectClass *gobject_class; |
126 GObjectClass *gobject_class; |
118 |
127 |
119 gobject_class = (GObjectClass *) klass; |
128 gobject_class = (GObjectClass *) klass; |
120 |
129 |
121 parent_class = g_type_class_peek_parent (klass); |
130 g_type_class_add_private (klass, sizeof (GstTaskPrivate)); |
122 |
131 |
123 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize); |
132 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_task_finalize); |
124 |
133 |
125 klass->pool = g_thread_pool_new ( |
134 init_klass_pool (klass); |
126 (GFunc) gst_task_func, klass, -1, FALSE, NULL); |
|
127 } |
135 } |
128 |
136 |
129 static void |
137 static void |
130 gst_task_init (GstTask * task) |
138 gst_task_init (GstTask * task) |
131 { |
139 { |
|
140 GstTaskClass *klass; |
|
141 |
|
142 klass = GST_TASK_GET_CLASS (task); |
|
143 |
|
144 task->priv = GST_TASK_GET_PRIVATE (task); |
132 task->running = FALSE; |
145 task->running = FALSE; |
133 task->abidata.ABI.thread = NULL; |
146 task->abidata.ABI.thread = NULL; |
134 task->lock = NULL; |
147 task->lock = NULL; |
135 task->cond = g_cond_new (); |
148 task->cond = g_cond_new (); |
136 task->state = GST_TASK_STOPPED; |
149 task->state = GST_TASK_STOPPED; |
|
150 task->priv->prio_set = FALSE; |
|
151 |
|
152 /* use the default klass pool for this task, users can |
|
153 * override this later */ |
|
154 g_static_mutex_lock (&pool_lock); |
|
155 task->priv->pool = gst_object_ref (klass->pool); |
|
156 g_static_mutex_unlock (&pool_lock); |
137 } |
157 } |
138 |
158 |
139 static void |
159 static void |
140 gst_task_finalize (GObject * object) |
160 gst_task_finalize (GObject * object) |
141 { |
161 { |
142 GstTask *task = GST_TASK (object); |
162 GstTask *task = GST_TASK (object); |
|
163 GstTaskPrivate *priv = task->priv; |
143 |
164 |
144 GST_DEBUG ("task %p finalize", task); |
165 GST_DEBUG ("task %p finalize", task); |
|
166 |
|
167 if (priv->thr_notify) |
|
168 priv->thr_notify (priv->thr_user_data); |
|
169 priv->thr_notify = NULL; |
|
170 priv->thr_user_data = NULL; |
|
171 |
|
172 gst_object_unref (priv->pool); |
145 |
173 |
146 /* task thread cannot be running here since it holds a ref |
174 /* task thread cannot be running here since it holds a ref |
147 * to the task so that the finalize could not have happened */ |
175 * to the task so that the finalize could not have happened */ |
148 g_cond_free (task->cond); |
176 g_cond_free (task->cond); |
149 task->cond = NULL; |
177 task->cond = NULL; |
150 |
178 |
151 G_OBJECT_CLASS (parent_class)->finalize (object); |
179 G_OBJECT_CLASS (gst_task_parent_class)->finalize (object); |
152 } |
180 } |
153 |
181 |
154 static void |
182 static void |
155 gst_task_func (GstTask * task, GstTaskClass * tclass) |
183 gst_task_func (GstTask * task) |
156 { |
184 { |
157 GStaticRecMutex *lock; |
185 GStaticRecMutex *lock; |
158 GThread *tself; |
186 GThread *tself; |
|
187 GstTaskPrivate *priv; |
|
188 |
|
189 priv = task->priv; |
159 |
190 |
160 tself = g_thread_self (); |
191 tself = g_thread_self (); |
161 |
192 |
162 GST_DEBUG ("Entering task %p, thread %p", task, tself); |
193 GST_DEBUG ("Entering task %p, thread %p", task, tself); |
163 |
194 |
207 |
245 |
208 GST_OBJECT_LOCK (task); |
246 GST_OBJECT_LOCK (task); |
209 task->abidata.ABI.thread = NULL; |
247 task->abidata.ABI.thread = NULL; |
210 |
248 |
211 exit: |
249 exit: |
|
250 if (priv->thr_callbacks.leave_thread) { |
|
251 /* fire the leave_thread callback when we need to. We need to do this before |
|
252 * we signal the task and with the task lock released. */ |
|
253 GST_OBJECT_UNLOCK (task); |
|
254 priv->thr_callbacks.leave_thread (task, tself, priv->thr_user_data); |
|
255 GST_OBJECT_LOCK (task); |
|
256 } else { |
|
257 /* restore normal priority when releasing back into the pool, we will not |
|
258 * touch the priority when a custom callback has been installed. */ |
|
259 g_thread_set_priority (tself, G_THREAD_PRIORITY_NORMAL); |
|
260 } |
212 /* now we allow messing with the lock again by setting the running flag to |
261 /* now we allow messing with the lock again by setting the running flag to |
213 * FALSE. Together with the SIGNAL this is the sign for the _join() to |
262 * FALSE. Together with the SIGNAL this is the sign for the _join() to |
214 * complete. |
263 * complete. |
215 * Note that we still have not dropped the final ref on the task. We could |
264 * Note that we still have not dropped the final ref on the task. We could |
216 * check here if there is a pending join() going on and drop the last ref |
265 * check here if there is a pending join() going on and drop the last ref |
248 gst_task_cleanup_all (void) |
297 gst_task_cleanup_all (void) |
249 { |
298 { |
250 GstTaskClass *klass; |
299 GstTaskClass *klass; |
251 |
300 |
252 if ((klass = g_type_class_peek (GST_TYPE_TASK))) { |
301 if ((klass = g_type_class_peek (GST_TYPE_TASK))) { |
253 g_static_mutex_lock (&pool_lock); |
302 init_klass_pool (klass); |
254 if (klass->pool) { |
|
255 /* Shut down all the threads, we still process the ones scheduled |
|
256 * because the unref happens in the thread function. |
|
257 * Also wait for currently running ones to finish. */ |
|
258 g_thread_pool_free (klass->pool, FALSE, TRUE); |
|
259 /* create new pool, so we can still do something after this |
|
260 * call. */ |
|
261 klass->pool = g_thread_pool_new ( |
|
262 (GFunc) gst_task_func, klass, -1, FALSE, NULL); |
|
263 } |
|
264 g_static_mutex_unlock (&pool_lock); |
|
265 } |
303 } |
266 } |
304 } |
267 |
305 |
268 /** |
306 /** |
269 * gst_task_create: |
307 * gst_task_create: |
270 * @func: The #GstTaskFunction to use |
308 * @func: The #GstTaskFunction to use |
271 * @data: User data to pass to @func |
309 * @data: User data to pass to @func |
272 * |
310 * |
273 * Create a new Task that will repeadedly call the provided @func |
311 * Create a new Task that will repeatedly call the provided @func |
274 * with @data as a parameter. Typically the task will run in |
312 * with @data as a parameter. Typically the task will run in |
275 * a new thread. |
313 * a new thread. |
276 * |
314 * |
277 * The function cannot be changed after the task has been created. You |
315 * The function cannot be changed after the task has been created. You |
278 * must create a new GstTask to change the function. |
316 * must create a new #GstTask to change the function. |
|
317 * |
|
318 * This function will not yet create and start a thread. Use gst_task_start() or |
|
319 * gst_task_pause() to create and start the GThread. |
|
320 * |
|
321 * Before the task can be used, a #GStaticRecMutex must be configured using the |
|
322 * gst_task_set_lock() function. This lock will always be acquired while |
|
323 * @func is called. |
279 * |
324 * |
280 * Returns: A new #GstTask. |
325 * Returns: A new #GstTask. |
281 * |
326 * |
282 * MT safe. |
327 * MT safe. |
283 */ |
328 */ |
333 GST_OBJECT_UNLOCK (task); |
378 GST_OBJECT_UNLOCK (task); |
334 g_warning ("cannot call set_lock on a running task"); |
379 g_warning ("cannot call set_lock on a running task"); |
335 } |
380 } |
336 } |
381 } |
337 |
382 |
|
383 /** |
|
384 * gst_task_set_priority: |
|
385 * @task: a #GstTask |
|
386 * @priority: a new priority for @task |
|
387 * |
|
388 * Changes the priority of @task to @priority. |
|
389 * |
|
390 * Note: try not to depend on task priorities. |
|
391 * |
|
392 * MT safe. |
|
393 * |
|
394 * Since: 0.10.24 |
|
395 */ |
|
396 #ifdef __SYMBIAN32__ |
|
397 EXPORT_C |
|
398 #endif |
|
399 |
|
400 void |
|
401 gst_task_set_priority (GstTask * task, GThreadPriority priority) |
|
402 { |
|
403 GstTaskPrivate *priv; |
|
404 GThread *thread; |
|
405 |
|
406 g_return_if_fail (GST_IS_TASK (task)); |
|
407 |
|
408 priv = task->priv; |
|
409 |
|
410 GST_OBJECT_LOCK (task); |
|
411 priv->prio_set = TRUE; |
|
412 priv->priority = priority; |
|
413 thread = task->abidata.ABI.thread; |
|
414 if (thread != NULL) { |
|
415 /* if this task already has a thread, we can configure the priority right |
|
416 * away, else we do that when we assign a thread to the task. */ |
|
417 g_thread_set_priority (thread, priority); |
|
418 } |
|
419 GST_OBJECT_UNLOCK (task); |
|
420 } |
|
421 |
|
422 /** |
|
423 * gst_task_get_pool: |
|
424 * @task: a #GstTask |
|
425 * |
|
426 * Get the #GstTaskPool that this task will use for its streaming |
|
427 * threads. |
|
428 * |
|
429 * MT safe. |
|
430 * |
|
431 * Returns: the #GstTaskPool used by @task. gst_object_unref() |
|
432 * after usage. |
|
433 * |
|
434 * Since: 0.10.24 |
|
435 */ |
|
436 #ifdef __SYMBIAN32__ |
|
437 EXPORT_C |
|
438 #endif |
|
439 |
|
440 GstTaskPool * |
|
441 gst_task_get_pool (GstTask * task) |
|
442 { |
|
443 GstTaskPool *result; |
|
444 GstTaskPrivate *priv; |
|
445 |
|
446 g_return_val_if_fail (GST_IS_TASK (task), NULL); |
|
447 |
|
448 priv = task->priv; |
|
449 |
|
450 GST_OBJECT_LOCK (task); |
|
451 result = gst_object_ref (priv->pool); |
|
452 GST_OBJECT_UNLOCK (task); |
|
453 |
|
454 return result; |
|
455 } |
|
456 |
|
457 /** |
|
458 * gst_task_set_pool: |
|
459 * @task: a #GstTask |
|
460 * @pool: a #GstTaskPool |
|
461 * |
|
462 * Set @pool as the new GstTaskPool for @task. Any new streaming threads that |
|
463 * will be created by @task will now use @pool. |
|
464 * |
|
465 * MT safe. |
|
466 * |
|
467 * Since: 0.10.24 |
|
468 */ |
|
469 #ifdef __SYMBIAN32__ |
|
470 EXPORT_C |
|
471 #endif |
|
472 |
|
473 void |
|
474 gst_task_set_pool (GstTask * task, GstTaskPool * pool) |
|
475 { |
|
476 GstTaskPool *old; |
|
477 GstTaskPrivate *priv; |
|
478 |
|
479 g_return_if_fail (GST_IS_TASK (task)); |
|
480 g_return_if_fail (GST_IS_TASK_POOL (pool)); |
|
481 |
|
482 priv = task->priv; |
|
483 |
|
484 GST_OBJECT_LOCK (task); |
|
485 if (priv->pool != pool) { |
|
486 old = priv->pool; |
|
487 priv->pool = gst_object_ref (pool); |
|
488 } else |
|
489 old = NULL; |
|
490 GST_OBJECT_UNLOCK (task); |
|
491 |
|
492 if (old) |
|
493 gst_object_unref (old); |
|
494 } |
|
495 |
|
496 |
|
497 /** |
|
498 * gst_task_set_thread_callbacks: |
|
499 * @task: The #GstTask to use |
|
500 * @callbacks: a #GstTaskThreadCallbacks pointer |
|
501 * @user_data: user data passed to the callbacks |
|
502 * @notify: called when @user_data is no longer referenced |
|
503 * |
|
504 * Set callbacks which will be executed when a new thread is needed, the thread |
|
505 * function is entered and left and when the thread is joined. |
|
506 * |
|
507 * By default a thread for @task will be created from a default thread pool. |
|
508 * |
|
509 * Objects can use custom GThreads or can perform additional configuration of |
|
510 * the threads (such as changing the thread priority) by installing callbacks. |
|
511 * |
|
512 * MT safe. |
|
513 * |
|
514 * Since: 0.10.24 |
|
515 */ |
|
516 #ifdef __SYMBIAN32__ |
|
517 EXPORT_C |
|
518 #endif |
|
519 |
|
520 void |
|
521 gst_task_set_thread_callbacks (GstTask * task, |
|
522 GstTaskThreadCallbacks * callbacks, gpointer user_data, |
|
523 GDestroyNotify notify) |
|
524 { |
|
525 GDestroyNotify old_notify; |
|
526 |
|
527 g_return_if_fail (task != NULL); |
|
528 g_return_if_fail (GST_IS_TASK (task)); |
|
529 g_return_if_fail (callbacks != NULL); |
|
530 |
|
531 GST_OBJECT_LOCK (task); |
|
532 old_notify = task->priv->thr_notify; |
|
533 |
|
534 if (old_notify) { |
|
535 gpointer old_data; |
|
536 |
|
537 old_data = task->priv->thr_user_data; |
|
538 |
|
539 task->priv->thr_user_data = NULL; |
|
540 task->priv->thr_notify = NULL; |
|
541 GST_OBJECT_UNLOCK (task); |
|
542 |
|
543 old_notify (old_data); |
|
544 |
|
545 GST_OBJECT_LOCK (task); |
|
546 } |
|
547 task->priv->thr_callbacks = *callbacks; |
|
548 task->priv->thr_user_data = user_data; |
|
549 task->priv->thr_notify = notify; |
|
550 GST_OBJECT_UNLOCK (task); |
|
551 } |
338 |
552 |
339 /** |
553 /** |
340 * gst_task_get_state: |
554 * gst_task_get_state: |
341 * @task: The #GstTask to query |
555 * @task: The #GstTask to query |
342 * |
556 * |
362 GST_OBJECT_UNLOCK (task); |
576 GST_OBJECT_UNLOCK (task); |
363 |
577 |
364 return result; |
578 return result; |
365 } |
579 } |
366 |
580 |
367 /** |
581 /* make sure the task is running and start a thread if it's not. |
368 * gst_task_start: |
582 * This function must be called with the task LOCK. */ |
369 * @task: The #GstTask to start |
583 static gboolean |
370 * |
584 start_task (GstTask * task) |
371 * Starts @task. The @task must have a lock associated with it using |
585 { |
372 * gst_task_set_lock() or thsi function will return FALSE. |
586 gboolean res = TRUE; |
373 * |
587 GstTaskClass *tclass; |
374 * Returns: TRUE if the task could be started. |
588 GError *error = NULL; |
375 * |
589 GstTaskPrivate *priv; |
376 * MT safe. |
590 |
|
591 priv = task->priv; |
|
592 |
|
593 /* new task, We ref before so that it remains alive while |
|
594 * the thread is running. */ |
|
595 gst_object_ref (task); |
|
596 /* mark task as running so that a join will wait until we schedule |
|
597 * and exit the task function. */ |
|
598 task->running = TRUE; |
|
599 |
|
600 tclass = GST_TASK_GET_CLASS (task); |
|
601 |
|
602 /* push on the thread pool, we remember the original pool because the user |
|
603 * could change it later on and then we join to the wrong pool. */ |
|
604 priv->pool_id = gst_object_ref (priv->pool); |
|
605 priv->id = |
|
606 gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func, |
|
607 task, &error); |
|
608 |
|
609 if (error != NULL) { |
|
610 g_warning ("failed to create thread: %s", error->message); |
|
611 g_error_free (error); |
|
612 res = FALSE; |
|
613 } |
|
614 return res; |
|
615 } |
|
616 |
|
617 |
|
618 /** |
|
619 * gst_task_set_state: |
|
620 * @task: a #GstTask |
|
621 * @state: the new task state |
|
622 * |
|
623 * Sets the state of @task to @state. |
|
624 * |
|
625 * The @task must have a lock associated with it using |
|
626 * gst_task_set_lock() when going to GST_TASK_STARTED or GST_TASK_PAUSED or |
|
627 * this function will return %FALSE. |
|
628 * |
|
629 * MT safe. |
|
630 * |
|
631 * Returns: %TRUE if the state could be changed. |
|
632 * |
|
633 * Since: 0.10.24 |
377 */ |
634 */ |
378 #ifdef __SYMBIAN32__ |
635 #ifdef __SYMBIAN32__ |
379 EXPORT_C |
636 EXPORT_C |
380 #endif |
637 #endif |
381 |
638 |
382 gboolean |
639 gboolean |
383 gst_task_start (GstTask * task) |
640 gst_task_set_state (GstTask * task, GstTaskState state) |
384 { |
641 { |
385 GstTaskState old; |
642 GstTaskState old; |
|
643 gboolean res = TRUE; |
386 |
644 |
387 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
645 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
388 |
646 |
389 GST_DEBUG_OBJECT (task, "Starting task %p", task); |
647 GST_DEBUG_OBJECT (task, "Changing task %p to state %d", task, state); |
390 |
648 |
391 GST_OBJECT_LOCK (task); |
649 GST_OBJECT_LOCK (task); |
392 if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) |
650 if (state != GST_TASK_STOPPED) |
393 goto no_lock; |
651 if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) |
394 |
652 goto no_lock; |
|
653 |
|
654 /* if the state changed, do our thing */ |
395 old = task->state; |
655 old = task->state; |
396 task->state = GST_TASK_STARTED; |
656 if (old != state) { |
397 switch (old) { |
657 task->state = state; |
398 case GST_TASK_STOPPED: |
658 switch (old) { |
399 { |
659 case GST_TASK_STOPPED: |
400 GstTaskClass *tclass; |
660 /* If the task already has a thread scheduled we don't have to do |
401 |
661 * anything. */ |
402 /* If the task already has a thread scheduled we don't have to do |
662 if (G_UNLIKELY (!task->running)) |
403 * anything. */ |
663 res = start_task (task); |
404 if (task->running) |
|
405 break; |
664 break; |
406 |
665 case GST_TASK_PAUSED: |
407 /* new task, push on threadpool. We ref before so |
666 /* when we are paused, signal to go to the new state */ |
408 * that it remains alive while on the threadpool. */ |
667 GST_TASK_SIGNAL (task); |
409 gst_object_ref (task); |
668 break; |
410 /* mark task as running so that a join will wait until we schedule |
669 case GST_TASK_STARTED: |
411 * and exit the task function. */ |
670 /* if we were started, we'll go to the new state after the next |
412 task->running = TRUE; |
671 * iteration. */ |
413 |
672 break; |
414 tclass = GST_TASK_GET_CLASS (task); |
|
415 |
|
416 g_static_mutex_lock (&pool_lock); |
|
417 g_thread_pool_push (tclass->pool, task, NULL); |
|
418 g_static_mutex_unlock (&pool_lock); |
|
419 break; |
|
420 } |
673 } |
421 case GST_TASK_PAUSED: |
674 } |
422 /* PAUSE to PLAY, signal */ |
675 GST_OBJECT_UNLOCK (task); |
423 GST_TASK_SIGNAL (task); |
676 |
424 break; |
677 return res; |
425 case GST_TASK_STARTED: |
|
426 /* was OK */ |
|
427 break; |
|
428 } |
|
429 GST_OBJECT_UNLOCK (task); |
|
430 |
|
431 return TRUE; |
|
432 |
678 |
433 /* ERRORS */ |
679 /* ERRORS */ |
434 no_lock: |
680 no_lock: |
435 { |
681 { |
436 GST_WARNING_OBJECT (task, "starting task without a lock"); |
682 GST_WARNING_OBJECT (task, "state %d set on task without a lock", state); |
437 GST_OBJECT_UNLOCK (task); |
683 GST_OBJECT_UNLOCK (task); |
438 g_warning ("starting task without a lock"); |
684 g_warning ("task without a lock can't be set to state %d", state); |
439 return FALSE; |
685 return FALSE; |
440 } |
686 } |
|
687 } |
|
688 |
|
689 /** |
|
690 * gst_task_start: |
|
691 * @task: The #GstTask to start |
|
692 * |
|
693 * Starts @task. The @task must have a lock associated with it using |
|
694 * gst_task_set_lock() or this function will return %FALSE. |
|
695 * |
|
696 * Returns: %TRUE if the task could be started. |
|
697 * |
|
698 * MT safe. |
|
699 */ |
|
700 #ifdef __SYMBIAN32__ |
|
701 EXPORT_C |
|
702 #endif |
|
703 |
|
704 gboolean |
|
705 gst_task_start (GstTask * task) |
|
706 { |
|
707 return gst_task_set_state (task, GST_TASK_STARTED); |
441 } |
708 } |
442 |
709 |
443 /** |
710 /** |
444 * gst_task_stop: |
711 * gst_task_stop: |
445 * @task: The #GstTask to stop |
712 * @task: The #GstTask to stop |
446 * |
713 * |
447 * Stops @task. This method merely schedules the task to stop and |
714 * Stops @task. This method merely schedules the task to stop and |
448 * will not wait for the task to have completely stopped. Use |
715 * will not wait for the task to have completely stopped. Use |
449 * gst_task_join() to stop and wait for completion. |
716 * gst_task_join() to stop and wait for completion. |
450 * |
717 * |
451 * Returns: TRUE if the task could be stopped. |
718 * Returns: %TRUE if the task could be stopped. |
452 * |
719 * |
453 * MT safe. |
720 * MT safe. |
454 */ |
721 */ |
455 #ifdef __SYMBIAN32__ |
722 #ifdef __SYMBIAN32__ |
456 EXPORT_C |
723 EXPORT_C |
457 #endif |
724 #endif |
458 |
725 |
459 gboolean |
726 gboolean |
460 gst_task_stop (GstTask * task) |
727 gst_task_stop (GstTask * task) |
461 { |
728 { |
462 GstTaskClass *tclass; |
729 return gst_task_set_state (task, GST_TASK_STOPPED); |
463 GstTaskState old; |
|
464 |
|
465 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
|
466 |
|
467 tclass = GST_TASK_GET_CLASS (task); |
|
468 |
|
469 GST_DEBUG_OBJECT (task, "Stopping task %p", task); |
|
470 |
|
471 GST_OBJECT_LOCK (task); |
|
472 old = task->state; |
|
473 task->state = GST_TASK_STOPPED; |
|
474 switch (old) { |
|
475 case GST_TASK_STOPPED: |
|
476 break; |
|
477 case GST_TASK_PAUSED: |
|
478 GST_TASK_SIGNAL (task); |
|
479 break; |
|
480 case GST_TASK_STARTED: |
|
481 break; |
|
482 } |
|
483 GST_OBJECT_UNLOCK (task); |
|
484 |
|
485 return TRUE; |
|
486 } |
730 } |
487 |
731 |
488 /** |
732 /** |
489 * gst_task_pause: |
733 * gst_task_pause: |
490 * @task: The #GstTask to pause |
734 * @task: The #GstTask to pause |
492 * Pauses @task. This method can also be called on a task in the |
736 * Pauses @task. This method can also be called on a task in the |
493 * stopped state, in which case a thread will be started and will remain |
737 * stopped state, in which case a thread will be started and will remain |
494 * in the paused state. This function does not wait for the task to complete |
738 * in the paused state. This function does not wait for the task to complete |
495 * the paused state. |
739 * the paused state. |
496 * |
740 * |
497 * Returns: TRUE if the task could be paused. |
741 * Returns: %TRUE if the task could be paused. |
498 * |
742 * |
499 * MT safe. |
743 * MT safe. |
500 */ |
744 */ |
501 #ifdef __SYMBIAN32__ |
745 #ifdef __SYMBIAN32__ |
502 EXPORT_C |
746 EXPORT_C |
503 #endif |
747 #endif |
504 gboolean |
748 gboolean |
505 gst_task_pause (GstTask * task) |
749 gst_task_pause (GstTask * task) |
506 { |
750 { |
507 GstTaskState old; |
751 return gst_task_set_state (task, GST_TASK_PAUSED); |
508 |
|
509 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
|
510 |
|
511 GST_DEBUG_OBJECT (task, "Pausing task %p", task); |
|
512 |
|
513 GST_OBJECT_LOCK (task); |
|
514 if (G_UNLIKELY (GST_TASK_GET_LOCK (task) == NULL)) |
|
515 goto no_lock; |
|
516 |
|
517 old = task->state; |
|
518 task->state = GST_TASK_PAUSED; |
|
519 switch (old) { |
|
520 case GST_TASK_STOPPED: |
|
521 { |
|
522 GstTaskClass *tclass; |
|
523 |
|
524 if (task->running) |
|
525 break; |
|
526 |
|
527 gst_object_ref (task); |
|
528 task->running = TRUE; |
|
529 |
|
530 tclass = GST_TASK_GET_CLASS (task); |
|
531 |
|
532 g_static_mutex_lock (&pool_lock); |
|
533 g_thread_pool_push (tclass->pool, task, NULL); |
|
534 g_static_mutex_unlock (&pool_lock); |
|
535 break; |
|
536 } |
|
537 case GST_TASK_PAUSED: |
|
538 break; |
|
539 case GST_TASK_STARTED: |
|
540 break; |
|
541 } |
|
542 GST_OBJECT_UNLOCK (task); |
|
543 |
|
544 return TRUE; |
|
545 |
|
546 /* ERRORS */ |
|
547 no_lock: |
|
548 { |
|
549 GST_WARNING_OBJECT (task, "pausing task without a lock"); |
|
550 GST_OBJECT_UNLOCK (task); |
|
551 g_warning ("pausing task without a lock"); |
|
552 return FALSE; |
|
553 } |
|
554 } |
752 } |
555 |
753 |
556 /** |
754 /** |
557 * gst_task_join: |
755 * gst_task_join: |
558 * @task: The #GstTask to join |
756 * @task: The #GstTask to join |
576 |
774 |
577 gboolean |
775 gboolean |
578 gst_task_join (GstTask * task) |
776 gst_task_join (GstTask * task) |
579 { |
777 { |
580 GThread *tself; |
778 GThread *tself; |
|
779 GstTaskPrivate *priv; |
|
780 gpointer id; |
|
781 GstTaskPool *pool = NULL; |
|
782 |
|
783 priv = task->priv; |
581 |
784 |
582 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
785 g_return_val_if_fail (GST_IS_TASK (task), FALSE); |
583 |
786 |
584 tself = g_thread_self (); |
787 tself = g_thread_self (); |
585 |
788 |
586 GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself); |
789 GST_DEBUG_OBJECT (task, "Joining task %p, thread %p", task, tself); |
587 |
790 |
588 /* we don't use a real thread join here because we are using |
791 /* we don't use a real thread join here because we are using |
589 * threadpools */ |
792 * thread pools */ |
590 GST_OBJECT_LOCK (task); |
793 GST_OBJECT_LOCK (task); |
591 if (G_UNLIKELY (tself == task->abidata.ABI.thread)) |
794 if (G_UNLIKELY (tself == task->abidata.ABI.thread)) |
592 goto joining_self; |
795 goto joining_self; |
593 task->state = GST_TASK_STOPPED; |
796 task->state = GST_TASK_STOPPED; |
594 /* signal the state change for when it was blocked in PAUSED. */ |
797 /* signal the state change for when it was blocked in PAUSED. */ |
595 GST_TASK_SIGNAL (task); |
798 GST_TASK_SIGNAL (task); |
596 /* we set the running flag when pushing the task on the threadpool. |
799 /* we set the running flag when pushing the task on the thread pool. |
597 * This means that the task function might not be called when we try |
800 * This means that the task function might not be called when we try |
598 * to join it here. */ |
801 * to join it here. */ |
599 while (G_LIKELY (task->running)) |
802 while (G_LIKELY (task->running)) |
600 GST_TASK_WAIT (task); |
803 GST_TASK_WAIT (task); |
601 GST_OBJECT_UNLOCK (task); |
804 /* clean the thread */ |
|
805 task->abidata.ABI.thread = NULL; |
|
806 /* get the id and pool to join */ |
|
807 pool = priv->pool_id; |
|
808 id = priv->id; |
|
809 priv->pool_id = NULL; |
|
810 priv->id = NULL; |
|
811 GST_OBJECT_UNLOCK (task); |
|
812 |
|
813 if (pool) { |
|
814 if (id) |
|
815 gst_task_pool_join (pool, id); |
|
816 gst_object_unref (pool); |
|
817 } |
602 |
818 |
603 GST_DEBUG_OBJECT (task, "Joined task %p", task); |
819 GST_DEBUG_OBJECT (task, "Joined task %p", task); |
604 |
820 |
605 return TRUE; |
821 return TRUE; |
606 |
822 |