|
1 /* GStreamer |
|
2 * Copyright (C) 2005 Wim Taymans <wim@fluendo.com> |
|
3 * |
|
4 * gstcollectpads.c: |
|
5 * |
|
6 * This library is free software; you can redistribute it and/or |
|
7 * modify it under the terms of the GNU Library General Public |
|
8 * License as published by the Free Software Foundation; either |
|
9 * version 2 of the License, or (at your option) any later version. |
|
10 * |
|
11 * This library is distributed in the hope that it will be useful, |
|
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
14 * Library General Public License for more details. |
|
15 * |
|
16 * You should have received a copy of the GNU Library General Public |
|
17 * License along with this library; if not, write to the |
|
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
|
19 * Boston, MA 02111-1307, USA. |
|
20 */ |
|
21 /** |
|
22 * SECTION:gstcollectpads |
|
23 * @short_description: manages a set of pads that operate in collect mode |
|
24 * @see_also: |
|
25 * |
|
26 * Manages a set of pads that operate in collect mode. This means that control |
|
27 * is given to the manager of this object when all pads have data. |
|
28 * <itemizedlist> |
|
29 * <listitem><para> |
|
30 * Collectpads are created with gst_collect_pads_new(). A callback should then |
|
31 * be installed with gst_collect_pads_set_function (). |
|
32 * </para></listitem> |
|
33 * <listitem><para> |
|
34 * Pads are added to the collection with gst_collect_pads_add_pad()/ |
|
35 * gst_collect_pads_remove_pad(). The pad |
|
36 * has to be a sinkpad. The chain and event functions of the pad are |
|
37 * overridden. The element_private of the pad is used to store |
|
38 * private information for the collectpads. |
|
39 * </para></listitem> |
|
40 * <listitem><para> |
|
41 * For each pad, data is queued in the _chain function or by |
|
42 * performing a pull_range. |
|
43 * </para></listitem> |
|
44 * <listitem><para> |
|
45 * When data is queued on all pads, the callback function is called. |
|
46 * </para></listitem> |
|
47 * <listitem><para> |
|
48 * Data can be dequeued from the pad with the gst_collect_pads_pop() method. |
|
49 * One can peek at the data with the gst_collect_pads_peek() function. |
|
50 * These functions will return NULL if the pad received an EOS event. When all |
|
51 * pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS |
|
52 * event itself. |
|
53 * </para></listitem> |
|
54 * <listitem><para> |
|
55 * Data can also be dequeued in byte units using the gst_collect_pads_available(), |
|
56 * gst_collect_pads_read() and gst_collect_pads_flush() calls. |
|
57 * </para></listitem> |
|
58 * <listitem><para> |
|
59 * Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in |
|
60 * their state change functions to start and stop the processing of the collecpads. |
|
61 * The gst_collect_pads_stop() call should be called before calling the parent |
|
62 * element state change function in the PAUSED_TO_READY state change to ensure |
|
63 * no pad is blocked and the element can finish streaming. |
|
64 * </para></listitem> |
|
65 * <listitem><para> |
|
66 * gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by |
|
67 * elements that start a #GstTask to drive the collect_pads. This feature is however |
|
68 * not yet implemented. |
|
69 * </para></listitem> |
|
70 * </itemizedlist> |
|
71 * |
|
72 * Last reviewed on 2006-05-10 (0.10.6) |
|
73 */ |
|
74 |
|
75 #ifdef __SYMBIAN32__ |
|
76 #include <gst_global.h> |
|
77 #endif |
|
78 #include "gstcollectpads.h" |
|
79 |
|
80 #ifdef __SYMBIAN32__ |
|
81 #include <glib_global.h> |
|
82 #include <gobject_global.h> |
|
83 #endif |
|
84 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug); |
|
85 #define GST_CAT_DEFAULT collect_pads_debug |
|
86 |
|
87 GST_BOILERPLATE (GstCollectPads, gst_collect_pads, GstObject, GST_TYPE_OBJECT); |
|
88 |
|
89 static void gst_collect_pads_clear (GstCollectPads * pads, |
|
90 GstCollectData * data); |
|
91 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer); |
|
92 static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event); |
|
93 static void gst_collect_pads_finalize (GObject * object); |
|
94 static void gst_collect_pads_init (GstCollectPads * pads, |
|
95 GstCollectPadsClass * g_class); |
|
96 static void ref_data (GstCollectData * data); |
|
97 static void unref_data (GstCollectData * data); |
|
98 |
|
99 static void |
|
100 gst_collect_pads_base_init (gpointer g_class) |
|
101 { |
|
102 /* Do nothing here */ |
|
103 } |
|
104 |
|
105 static void |
|
106 gst_collect_pads_class_init (GstCollectPadsClass * klass) |
|
107 { |
|
108 GObjectClass *gobject_class = (GObjectClass *) klass; |
|
109 |
|
110 GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0, |
|
111 "GstCollectPads"); |
|
112 |
|
113 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize); |
|
114 } |
|
115 |
|
116 static void |
|
117 gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class) |
|
118 { |
|
119 pads->cond = g_cond_new (); |
|
120 pads->data = NULL; |
|
121 pads->cookie = 0; |
|
122 pads->numpads = 0; |
|
123 pads->queuedpads = 0; |
|
124 pads->eospads = 0; |
|
125 pads->started = FALSE; |
|
126 |
|
127 /* members to manage the pad list */ |
|
128 pads->abidata.ABI.pad_lock = g_mutex_new (); |
|
129 pads->abidata.ABI.pad_cookie = 0; |
|
130 pads->abidata.ABI.pad_list = NULL; |
|
131 } |
|
132 |
|
133 static void |
|
134 gst_collect_pads_finalize (GObject * object) |
|
135 { |
|
136 GSList *collected; |
|
137 GstCollectPads *pads = GST_COLLECT_PADS (object); |
|
138 |
|
139 GST_DEBUG ("finalize"); |
|
140 |
|
141 g_cond_free (pads->cond); |
|
142 g_mutex_free (pads->abidata.ABI.pad_lock); |
|
143 |
|
144 /* Remove pads */ |
|
145 collected = pads->abidata.ABI.pad_list; |
|
146 for (; collected; collected = g_slist_next (collected)) { |
|
147 GstCollectData *pdata = (GstCollectData *) collected->data; |
|
148 |
|
149 unref_data (pdata); |
|
150 } |
|
151 /* Free pads list */ |
|
152 g_slist_foreach (pads->data, (GFunc) unref_data, NULL); |
|
153 g_slist_free (pads->data); |
|
154 g_slist_free (pads->abidata.ABI.pad_list); |
|
155 |
|
156 G_OBJECT_CLASS (parent_class)->finalize (object); |
|
157 } |
|
158 |
|
159 /** |
|
160 * gst_collect_pads_new: |
|
161 * |
|
162 * Create a new instance of #GstCollectsPads. |
|
163 * |
|
164 * Returns: a new #GstCollectPads, or NULL in case of an error. |
|
165 * |
|
166 * MT safe. |
|
167 */ |
|
168 #ifdef __SYMBIAN32__ |
|
169 EXPORT_C |
|
170 #endif |
|
171 |
|
172 GstCollectPads * |
|
173 gst_collect_pads_new (void) |
|
174 { |
|
175 GstCollectPads *newcoll; |
|
176 |
|
177 newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL); |
|
178 |
|
179 return newcoll; |
|
180 } |
|
181 |
|
182 /** |
|
183 * gst_collect_pads_set_function: |
|
184 * @pads: the collectspads to use |
|
185 * @func: the function to set |
|
186 * @user_data: user data passed to the function |
|
187 * |
|
188 * Set the callback function and user data that will be called when |
|
189 * all the pads added to the collection have buffers queued. |
|
190 * |
|
191 * MT safe. |
|
192 */ |
|
193 #ifdef __SYMBIAN32__ |
|
194 EXPORT_C |
|
195 #endif |
|
196 |
|
197 void |
|
198 gst_collect_pads_set_function (GstCollectPads * pads, |
|
199 GstCollectPadsFunction func, gpointer user_data) |
|
200 { |
|
201 g_return_if_fail (pads != NULL); |
|
202 g_return_if_fail (GST_IS_COLLECT_PADS (pads)); |
|
203 |
|
204 GST_OBJECT_LOCK (pads); |
|
205 pads->func = func; |
|
206 pads->user_data = user_data; |
|
207 GST_OBJECT_UNLOCK (pads); |
|
208 } |
|
209 |
|
210 static void |
|
211 ref_data (GstCollectData * data) |
|
212 { |
|
213 g_assert (data != NULL); |
|
214 |
|
215 g_atomic_int_inc (&(data->abidata.ABI.refcount)); |
|
216 } |
|
217 |
|
218 static void |
|
219 unref_data (GstCollectData * data) |
|
220 { |
|
221 GstCollectDataDestroyNotify destroy_notify; |
|
222 |
|
223 g_assert (data != NULL); |
|
224 g_assert (data->abidata.ABI.refcount > 0); |
|
225 |
|
226 if (!g_atomic_int_dec_and_test (&(data->abidata.ABI.refcount))) |
|
227 return; |
|
228 |
|
229 /* FIXME: Ugly hack as we can't add more fields to GstCollectData */ |
|
230 destroy_notify = (GstCollectDataDestroyNotify) |
|
231 g_object_get_data (G_OBJECT (data->pad), |
|
232 "gst-collect-data-destroy-notify"); |
|
233 |
|
234 if (destroy_notify) |
|
235 destroy_notify (data); |
|
236 |
|
237 g_object_unref (data->pad); |
|
238 if (data->buffer) { |
|
239 gst_buffer_unref (data->buffer); |
|
240 } |
|
241 g_free (data); |
|
242 } |
|
243 |
|
244 /** |
|
245 * gst_collect_pads_add_pad: |
|
246 * @pads: the collectspads to use |
|
247 * @pad: the pad to add |
|
248 * @size: the size of the returned #GstCollectData structure |
|
249 * |
|
250 * Add a pad to the collection of collect pads. The pad has to be |
|
251 * a sinkpad. The refcount of the pad is incremented. Use |
|
252 * gst_collect_pads_remove_pad() to remove the pad from the collection |
|
253 * again. |
|
254 * |
|
255 * You specify a size for the returned #GstCollectData structure |
|
256 * so that you can use it to store additional information. |
|
257 * |
|
258 * The pad will be automatically activated in push mode when @pads is |
|
259 * started. |
|
260 * |
|
261 * This function calls gst_collect_pads_add_pad() passing a value of NULL |
|
262 * for destroy_notify. |
|
263 * |
|
264 * Returns: a new #GstCollectData to identify the new pad. Or NULL |
|
265 * if wrong parameters are supplied. |
|
266 * |
|
267 * MT safe. |
|
268 */ |
|
269 #ifdef __SYMBIAN32__ |
|
270 EXPORT_C |
|
271 #endif |
|
272 |
|
273 GstCollectData * |
|
274 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size) |
|
275 { |
|
276 return gst_collect_pads_add_pad_full (pads, pad, size, NULL); |
|
277 } |
|
278 |
|
279 /** |
|
280 * gst_collect_pads_add_pad_full: |
|
281 * @pads: the collectspads to use |
|
282 * @pad: the pad to add |
|
283 * @size: the size of the returned #GstCollectData structure |
|
284 * @destroy_notify: function to be called before the returned #GstCollectData |
|
285 * structure is freed |
|
286 * |
|
287 * Add a pad to the collection of collect pads. The pad has to be |
|
288 * a sinkpad. The refcount of the pad is incremented. Use |
|
289 * gst_collect_pads_remove_pad() to remove the pad from the collection |
|
290 * again. |
|
291 * |
|
292 * You specify a size for the returned #GstCollectData structure |
|
293 * so that you can use it to store additional information. |
|
294 * |
|
295 * You can also specify a #GstCollectDataDestroyNotify that will be called |
|
296 * just before the #GstCollectData structure is freed. It is passed the |
|
297 * pointer to the structure and should free any custom memory and resources |
|
298 * allocated for it. |
|
299 * |
|
300 * The pad will be automatically activated in push mode when @pads is |
|
301 * started. |
|
302 * |
|
303 * Since: 0.10.12 |
|
304 * |
|
305 * Returns: a new #GstCollectData to identify the new pad. Or NULL |
|
306 * if wrong parameters are supplied. |
|
307 * |
|
308 * MT safe. |
|
309 */ |
|
310 #ifdef __SYMBIAN32__ |
|
311 EXPORT_C |
|
312 #endif |
|
313 |
|
314 GstCollectData * |
|
315 gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad, guint size, |
|
316 GstCollectDataDestroyNotify destroy_notify) |
|
317 { |
|
318 GstCollectData *data; |
|
319 |
|
320 g_return_val_if_fail (pads != NULL, NULL); |
|
321 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL); |
|
322 g_return_val_if_fail (pad != NULL, NULL); |
|
323 g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL); |
|
324 g_return_val_if_fail (size >= sizeof (GstCollectData), NULL); |
|
325 |
|
326 GST_DEBUG ("adding pad %s:%s", GST_DEBUG_PAD_NAME (pad)); |
|
327 |
|
328 data = g_malloc0 (size); |
|
329 data->collect = pads; |
|
330 data->pad = gst_object_ref (pad); |
|
331 data->buffer = NULL; |
|
332 data->pos = 0; |
|
333 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED); |
|
334 data->abidata.ABI.flushing = FALSE; |
|
335 data->abidata.ABI.new_segment = FALSE; |
|
336 data->abidata.ABI.eos = FALSE; |
|
337 data->abidata.ABI.refcount = 1; |
|
338 |
|
339 /* FIXME: Ugly hack as we can't add more fields to GstCollectData */ |
|
340 g_object_set_data (G_OBJECT (pad), "gst-collect-data-destroy-notify", |
|
341 (void *) destroy_notify); |
|
342 |
|
343 GST_COLLECT_PADS_PAD_LOCK (pads); |
|
344 GST_OBJECT_LOCK (pad); |
|
345 gst_pad_set_element_private (pad, data); |
|
346 GST_OBJECT_UNLOCK (pad); |
|
347 pads->abidata.ABI.pad_list = |
|
348 g_slist_append (pads->abidata.ABI.pad_list, data); |
|
349 gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain)); |
|
350 gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event)); |
|
351 /* activate the pad when needed */ |
|
352 if (pads->started) |
|
353 gst_pad_set_active (pad, TRUE); |
|
354 pads->abidata.ABI.pad_cookie++; |
|
355 GST_COLLECT_PADS_PAD_UNLOCK (pads); |
|
356 |
|
357 return data; |
|
358 } |
|
359 |
|
360 static gint |
|
361 find_pad (GstCollectData * data, GstPad * pad) |
|
362 { |
|
363 if (data->pad == pad) |
|
364 return 0; |
|
365 return 1; |
|
366 } |
|
367 |
|
368 /** |
|
369 * gst_collect_pads_remove_pad: |
|
370 * @pads: the collectspads to use |
|
371 * @pad: the pad to remove |
|
372 * |
|
373 * Remove a pad from the collection of collect pads. This function will also |
|
374 * free the #GstCollectData and all the resources that were allocated with |
|
375 * gst_collect_pads_add_pad(). |
|
376 * |
|
377 * The pad will be deactivated automatically when @pads is stopped. |
|
378 * |
|
379 * Returns: %TRUE if the pad could be removed. |
|
380 * |
|
381 * MT safe. |
|
382 */ |
|
383 #ifdef __SYMBIAN32__ |
|
384 EXPORT_C |
|
385 #endif |
|
386 |
|
387 gboolean |
|
388 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad) |
|
389 { |
|
390 GstCollectData *data; |
|
391 GSList *list; |
|
392 |
|
393 g_return_val_if_fail (pads != NULL, FALSE); |
|
394 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE); |
|
395 g_return_val_if_fail (pad != NULL, FALSE); |
|
396 g_return_val_if_fail (GST_IS_PAD (pad), FALSE); |
|
397 |
|
398 GST_DEBUG ("removing pad %s:%s", GST_DEBUG_PAD_NAME (pad)); |
|
399 |
|
400 GST_COLLECT_PADS_PAD_LOCK (pads); |
|
401 list = |
|
402 g_slist_find_custom (pads->abidata.ABI.pad_list, pad, |
|
403 (GCompareFunc) find_pad); |
|
404 if (!list) |
|
405 goto unknown_pad; |
|
406 |
|
407 data = (GstCollectData *) list->data; |
|
408 |
|
409 GST_DEBUG ("found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad), data); |
|
410 |
|
411 /* clear the stuff we configured */ |
|
412 gst_pad_set_chain_function (pad, NULL); |
|
413 gst_pad_set_event_function (pad, NULL); |
|
414 GST_OBJECT_LOCK (pad); |
|
415 gst_pad_set_element_private (pad, NULL); |
|
416 GST_OBJECT_UNLOCK (pad); |
|
417 |
|
418 /* backward compat, also remove from data if stopped, note that this function |
|
419 * can only be called when we are stopped because we don't take the LOCK to |
|
420 * protect the pads->data list. */ |
|
421 if (!pads->started) { |
|
422 GSList *dlist; |
|
423 |
|
424 dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad); |
|
425 if (dlist) { |
|
426 GstCollectData *pdata = dlist->data; |
|
427 |
|
428 pads->data = g_slist_delete_link (pads->data, dlist); |
|
429 unref_data (pdata); |
|
430 } |
|
431 } |
|
432 /* remove from the pad list */ |
|
433 pads->abidata.ABI.pad_list = |
|
434 g_slist_delete_link (pads->abidata.ABI.pad_list, list); |
|
435 pads->abidata.ABI.pad_cookie++; |
|
436 |
|
437 /* signal waiters because something changed */ |
|
438 GST_COLLECT_PADS_BROADCAST (pads); |
|
439 |
|
440 /* deactivate the pad when needed */ |
|
441 if (!pads->started) |
|
442 gst_pad_set_active (pad, FALSE); |
|
443 |
|
444 /* clean and free the collect data */ |
|
445 unref_data (data); |
|
446 |
|
447 GST_COLLECT_PADS_PAD_UNLOCK (pads); |
|
448 |
|
449 return TRUE; |
|
450 |
|
451 unknown_pad: |
|
452 { |
|
453 GST_WARNING ("cannot remove unknown pad %s:%s", GST_DEBUG_PAD_NAME (pad)); |
|
454 GST_COLLECT_PADS_PAD_UNLOCK (pads); |
|
455 return FALSE; |
|
456 } |
|
457 } |
|
458 |
|
459 /** |
|
460 * gst_collect_pads_is_active: |
|
461 * @pads: the collectspads to use |
|
462 * @pad: the pad to check |
|
463 * |
|
464 * Check if a pad is active. |
|
465 * |
|
466 * This function is currently not implemented. |
|
467 * |
|
468 * Returns: %TRUE if the pad is active. |
|
469 * |
|
470 * MT safe. |
|
471 */ |
|
472 #ifdef __SYMBIAN32__ |
|
473 EXPORT_C |
|
474 #endif |
|
475 |
|
476 gboolean |
|
477 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad) |
|
478 { |
|
479 g_return_val_if_fail (pads != NULL, FALSE); |
|
480 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE); |
|
481 g_return_val_if_fail (pad != NULL, FALSE); |
|
482 g_return_val_if_fail (GST_IS_PAD (pad), FALSE); |
|
483 |
|
484 g_warning ("gst_collect_pads_is_active() is not implemented"); |
|
485 |
|
486 return FALSE; |
|
487 } |
|
488 |
|
489 /** |
|
490 * gst_collect_pads_collect: |
|
491 * @pads: the collectspads to use |
|
492 * |
|
493 * Collect data on all pads. This function is usually called |
|
494 * from a #GstTask function in an element. |
|
495 * |
|
496 * This function is currently not implemented. |
|
497 * |
|
498 * Returns: #GstFlowReturn of the operation. |
|
499 * |
|
500 * MT safe. |
|
501 */ |
|
502 #ifdef __SYMBIAN32__ |
|
503 EXPORT_C |
|
504 #endif |
|
505 |
|
506 GstFlowReturn |
|
507 gst_collect_pads_collect (GstCollectPads * pads) |
|
508 { |
|
509 g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR); |
|
510 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR); |
|
511 |
|
512 g_warning ("gst_collect_pads_collect() is not implemented"); |
|
513 |
|
514 return GST_FLOW_NOT_SUPPORTED; |
|
515 } |
|
516 |
|
517 /** |
|
518 * gst_collect_pads_collect_range: |
|
519 * @pads: the collectspads to use |
|
520 * @offset: the offset to collect |
|
521 * @length: the length to collect |
|
522 * |
|
523 * Collect data with @offset and @length on all pads. This function |
|
524 * is typically called in the getrange function of an element. |
|
525 * |
|
526 * This function is currently not implemented. |
|
527 * |
|
528 * Returns: #GstFlowReturn of the operation. |
|
529 * |
|
530 * MT safe. |
|
531 */ |
|
532 #ifdef __SYMBIAN32__ |
|
533 EXPORT_C |
|
534 #endif |
|
535 |
|
536 GstFlowReturn |
|
537 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset, |
|
538 guint length) |
|
539 { |
|
540 g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR); |
|
541 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR); |
|
542 |
|
543 g_warning ("gst_collect_pads_collect_range() is not implemented"); |
|
544 |
|
545 return GST_FLOW_NOT_SUPPORTED; |
|
546 } |
|
547 |
|
548 /* FIXME, I think this function is used to work around bad behaviour |
|
549 * of elements that add pads to themselves without activating them. |
|
550 * |
|
551 * Must be called with PAD_LOCK. |
|
552 */ |
|
553 static void |
|
554 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads, |
|
555 gboolean flushing) |
|
556 { |
|
557 GSList *walk = NULL; |
|
558 |
|
559 /* Update the pads flushing flag */ |
|
560 for (walk = pads->data; walk; walk = g_slist_next (walk)) { |
|
561 GstCollectData *cdata = walk->data; |
|
562 |
|
563 if (GST_IS_PAD (cdata->pad)) { |
|
564 GST_OBJECT_LOCK (cdata->pad); |
|
565 if (flushing) |
|
566 GST_PAD_SET_FLUSHING (cdata->pad); |
|
567 else |
|
568 GST_PAD_UNSET_FLUSHING (cdata->pad); |
|
569 cdata->abidata.ABI.flushing = flushing; |
|
570 gst_collect_pads_clear (pads, cdata); |
|
571 GST_OBJECT_UNLOCK (cdata->pad); |
|
572 } |
|
573 } |
|
574 } |
|
575 |
|
576 /** |
|
577 * gst_collect_pads_set_flushing: |
|
578 * @pads: the collectspads to use |
|
579 * @flushing: desired state of the pads |
|
580 * |
|
581 * Change the flushing state of all the pads in the collection. No pad |
|
582 * is able to accept anymore data when @flushing is %TRUE. Calling this |
|
583 * function with @flushing %FALSE makes @pads accept data again. |
|
584 * |
|
585 * MT safe. |
|
586 * |
|
587 * Since: 0.10.7. |
|
588 */ |
|
589 #ifdef __SYMBIAN32__ |
|
590 EXPORT_C |
|
591 #endif |
|
592 |
|
593 void |
|
594 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing) |
|
595 { |
|
596 g_return_if_fail (pads != NULL); |
|
597 g_return_if_fail (GST_IS_COLLECT_PADS (pads)); |
|
598 |
|
599 GST_COLLECT_PADS_PAD_LOCK (pads); |
|
600 gst_collect_pads_set_flushing_unlocked (pads, flushing); |
|
601 GST_COLLECT_PADS_PAD_UNLOCK (pads); |
|
602 } |
|
603 |
|
604 /** |
|
605 * gst_collect_pads_start: |
|
606 * @pads: the collectspads to use |
|
607 * |
|
608 * Starts the processing of data in the collect_pads. |
|
609 * |
|
610 * MT safe. |
|
611 */ |
|
612 #ifdef __SYMBIAN32__ |
|
613 EXPORT_C |
|
614 #endif |
|
615 |
|
616 void |
|
617 gst_collect_pads_start (GstCollectPads * pads) |
|
618 { |
|
619 GSList *collected; |
|
620 |
|
621 g_return_if_fail (pads != NULL); |
|
622 g_return_if_fail (GST_IS_COLLECT_PADS (pads)); |
|
623 |
|
624 GST_DEBUG_OBJECT (pads, "starting collect pads"); |
|
625 |
|
626 /* make sure stop and collect cannot be called anymore */ |
|
627 GST_OBJECT_LOCK (pads); |
|
628 |
|
629 /* make pads streamable */ |
|
630 GST_COLLECT_PADS_PAD_LOCK (pads); |
|
631 |
|
632 /* loop over the master pad list and reset the segment */ |
|
633 collected = pads->abidata.ABI.pad_list; |
|
634 for (; collected; collected = g_slist_next (collected)) { |
|
635 GstCollectData *data; |
|
636 |
|
637 data = collected->data; |
|
638 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED); |
|
639 } |
|
640 |
|
641 gst_collect_pads_set_flushing_unlocked (pads, FALSE); |
|
642 |
|
643 /* Start collect pads */ |
|
644 pads->started = TRUE; |
|
645 GST_COLLECT_PADS_PAD_UNLOCK (pads); |
|
646 GST_OBJECT_UNLOCK (pads); |
|
647 } |
|
648 |
|
649 /** |
|
650 * gst_collect_pads_stop: |
|
651 * @pads: the collectspads to use |
|
652 * |
|
653 * Stops the processing of data in the collect_pads. this function |
|
654 * will also unblock any blocking operations. |
|
655 * |
|
656 * MT safe. |
|
657 */ |
|
658 #ifdef __SYMBIAN32__ |
|
659 EXPORT_C |
|
660 #endif |
|
661 |
|
662 void |
|
663 gst_collect_pads_stop (GstCollectPads * pads) |
|
664 { |
|
665 GSList *collected; |
|
666 |
|
667 g_return_if_fail (pads != NULL); |
|
668 g_return_if_fail (GST_IS_COLLECT_PADS (pads)); |
|
669 |
|
670 GST_DEBUG_OBJECT (pads, "stopping collect pads"); |
|
671 |
|
672 /* make sure collect and start cannot be called anymore */ |
|
673 GST_OBJECT_LOCK (pads); |
|
674 |
|
675 /* make pads not accept data anymore */ |
|
676 GST_COLLECT_PADS_PAD_LOCK (pads); |
|
677 gst_collect_pads_set_flushing_unlocked (pads, TRUE); |
|
678 |
|
679 /* Stop collect pads */ |
|
680 pads->started = FALSE; |
|
681 pads->eospads = 0; |
|
682 pads->queuedpads = 0; |
|
683 |
|
684 /* loop over the master pad list and flush buffers */ |
|
685 collected = pads->abidata.ABI.pad_list; |
|
686 for (; collected; collected = g_slist_next (collected)) { |
|
687 GstCollectData *data; |
|
688 GstBuffer **buffer_p; |
|
689 |
|
690 data = collected->data; |
|
691 if (data->buffer) { |
|
692 buffer_p = &data->buffer; |
|
693 gst_buffer_replace (buffer_p, NULL); |
|
694 data->pos = 0; |
|
695 } |
|
696 data->abidata.ABI.eos = FALSE; |
|
697 } |
|
698 |
|
699 GST_COLLECT_PADS_PAD_UNLOCK (pads); |
|
700 /* Wake them up so they can end the chain functions. */ |
|
701 GST_COLLECT_PADS_BROADCAST (pads); |
|
702 |
|
703 GST_OBJECT_UNLOCK (pads); |
|
704 } |
|
705 |
|
706 /** |
|
707 * gst_collect_pads_peek: |
|
708 * @pads: the collectspads to peek |
|
709 * @data: the data to use |
|
710 * |
|
711 * Peek at the buffer currently queued in @data. This function |
|
712 * should be called with the @pads LOCK held, such as in the callback |
|
713 * handler. |
|
714 * |
|
715 * Returns: The buffer in @data or NULL if no buffer is queued. |
|
716 * should unref the buffer after usage. |
|
717 * |
|
718 * MT safe. |
|
719 */ |
|
720 #ifdef __SYMBIAN32__ |
|
721 EXPORT_C |
|
722 #endif |
|
723 |
|
724 GstBuffer * |
|
725 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data) |
|
726 { |
|
727 GstBuffer *result; |
|
728 |
|
729 g_return_val_if_fail (pads != NULL, NULL); |
|
730 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL); |
|
731 g_return_val_if_fail (data != NULL, NULL); |
|
732 |
|
733 if ((result = data->buffer)) |
|
734 gst_buffer_ref (result); |
|
735 |
|
736 GST_DEBUG ("Peeking at pad %s:%s: buffer=%p", |
|
737 GST_DEBUG_PAD_NAME (data->pad), result); |
|
738 |
|
739 return result; |
|
740 } |
|
741 |
|
742 /** |
|
743 * gst_collect_pads_pop: |
|
744 * @pads: the collectspads to pop |
|
745 * @data: the data to use |
|
746 * |
|
747 * Pop the buffer currently queued in @data. This function |
|
748 * should be called with the @pads LOCK held, such as in the callback |
|
749 * handler. |
|
750 * |
|
751 * Returns: The buffer in @data or NULL if no buffer was queued. |
|
752 * You should unref the buffer after usage. |
|
753 * |
|
754 * MT safe. |
|
755 */ |
|
756 #ifdef __SYMBIAN32__ |
|
757 EXPORT_C |
|
758 #endif |
|
759 |
|
760 GstBuffer * |
|
761 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data) |
|
762 { |
|
763 GstBuffer *result; |
|
764 |
|
765 g_return_val_if_fail (pads != NULL, NULL); |
|
766 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL); |
|
767 g_return_val_if_fail (data != NULL, NULL); |
|
768 |
|
769 if ((result = data->buffer)) { |
|
770 data->buffer = NULL; |
|
771 data->pos = 0; |
|
772 /* one less pad with queued data now */ |
|
773 pads->queuedpads--; |
|
774 } |
|
775 |
|
776 GST_COLLECT_PADS_BROADCAST (pads); |
|
777 |
|
778 GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p", |
|
779 GST_DEBUG_PAD_NAME (data->pad), result); |
|
780 |
|
781 return result; |
|
782 } |
|
783 |
|
784 /* pop and unref the currently queued buffer, should e called with the LOCK |
|
785 * helt. */ |
|
786 static void |
|
787 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data) |
|
788 { |
|
789 GstBuffer *buf; |
|
790 |
|
791 if ((buf = gst_collect_pads_pop (pads, data))) |
|
792 gst_buffer_unref (buf); |
|
793 } |
|
794 |
|
795 /** |
|
796 * gst_collect_pads_available: |
|
797 * @pads: the collectspads to query |
|
798 * |
|
799 * Query how much bytes can be read from each queued buffer. This means |
|
800 * that the result of this call is the maximum number of bytes that can |
|
801 * be read from each of the pads. |
|
802 * |
|
803 * This function should be called with @pads LOCK held, such as |
|
804 * in the callback. |
|
805 * |
|
806 * Returns: The maximum number of bytes queued on all pads. This function |
|
807 * returns 0 if a pad has no queued buffer. |
|
808 * |
|
809 * MT safe. |
|
810 */ |
|
811 /* FIXME, we can do this in the _chain functions */ |
|
812 #ifdef __SYMBIAN32__ |
|
813 EXPORT_C |
|
814 #endif |
|
815 |
|
816 guint |
|
817 gst_collect_pads_available (GstCollectPads * pads) |
|
818 { |
|
819 GSList *collected; |
|
820 guint result = G_MAXUINT; |
|
821 |
|
822 g_return_val_if_fail (pads != NULL, 0); |
|
823 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0); |
|
824 |
|
825 for (collected = pads->data; collected; collected = g_slist_next (collected)) { |
|
826 GstCollectData *pdata; |
|
827 GstBuffer *buffer; |
|
828 gint size; |
|
829 |
|
830 pdata = (GstCollectData *) collected->data; |
|
831 |
|
832 /* ignore pad with EOS */ |
|
833 if (G_UNLIKELY (pdata->abidata.ABI.eos)) { |
|
834 GST_DEBUG ("pad %p is EOS", pdata); |
|
835 continue; |
|
836 } |
|
837 |
|
838 /* an empty buffer without EOS is weird when we get here.. */ |
|
839 if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) { |
|
840 GST_WARNING ("pad %p has no buffer", pdata); |
|
841 goto not_filled; |
|
842 } |
|
843 |
|
844 /* this is the size left of the buffer */ |
|
845 size = GST_BUFFER_SIZE (buffer) - pdata->pos; |
|
846 GST_DEBUG ("pad %p has %d bytes left", pdata, size); |
|
847 |
|
848 /* need to return the min of all available data */ |
|
849 if (size < result) |
|
850 result = size; |
|
851 } |
|
852 /* nothing changed, all must be EOS then, return 0 */ |
|
853 if (G_UNLIKELY (result == G_MAXUINT)) |
|
854 result = 0; |
|
855 |
|
856 return result; |
|
857 |
|
858 not_filled: |
|
859 { |
|
860 return 0; |
|
861 } |
|
862 } |
|
863 |
|
864 /** |
|
865 * gst_collect_pads_read: |
|
866 * @pads: the collectspads to query |
|
867 * @data: the data to use |
|
868 * @bytes: a pointer to a byte array |
|
869 * @size: the number of bytes to read |
|
870 * |
|
871 * Get a pointer in @bytes where @size bytes can be read from the |
|
872 * given pad @data. |
|
873 * |
|
874 * This function should be called with @pads LOCK held, such as |
|
875 * in the callback. |
|
876 * |
|
877 * Returns: The number of bytes available for consumption in the |
|
878 * memory pointed to by @bytes. This can be less than @size and |
|
879 * is 0 if the pad is end-of-stream. |
|
880 * |
|
881 * MT safe. |
|
882 */ |
|
883 #ifdef __SYMBIAN32__ |
|
884 EXPORT_C |
|
885 #endif |
|
886 |
|
887 guint |
|
888 gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data, |
|
889 guint8 ** bytes, guint size) |
|
890 { |
|
891 guint readsize; |
|
892 GstBuffer *buffer; |
|
893 |
|
894 g_return_val_if_fail (pads != NULL, 0); |
|
895 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0); |
|
896 g_return_val_if_fail (data != NULL, 0); |
|
897 g_return_val_if_fail (bytes != NULL, 0); |
|
898 |
|
899 /* no buffer, must be EOS */ |
|
900 if ((buffer = data->buffer) == NULL) |
|
901 return 0; |
|
902 |
|
903 readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos); |
|
904 |
|
905 *bytes = GST_BUFFER_DATA (buffer) + data->pos; |
|
906 |
|
907 return readsize; |
|
908 } |
|
909 |
|
910 /** |
|
911 * gst_collect_pads_read_buffer: |
|
912 * @pads: the collectspads to query |
|
913 * @data: the data to use |
|
914 * @size: the number of bytes to read |
|
915 * |
|
916 * Get a subbuffer of @size bytes from the given pad @data. |
|
917 * |
|
918 * This function should be called with @pads LOCK held, such as in the callback. |
|
919 * |
|
920 * Since: 0.10.18 |
|
921 * |
|
922 * Returns: A sub buffer. The size of the buffer can be less that requested. |
|
923 * A return of NULL signals that the pad is end-of-stream. |
|
924 * Unref the buffer after use. |
|
925 * |
|
926 * MT safe. |
|
927 */ |
|
928 #ifdef __SYMBIAN32__ |
|
929 EXPORT_C |
|
930 #endif |
|
931 |
|
932 GstBuffer * |
|
933 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data, |
|
934 guint size) |
|
935 { |
|
936 guint readsize; |
|
937 GstBuffer *buffer; |
|
938 |
|
939 g_return_val_if_fail (pads != NULL, NULL); |
|
940 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL); |
|
941 g_return_val_if_fail (data != NULL, NULL); |
|
942 |
|
943 /* no buffer, must be EOS */ |
|
944 if ((buffer = data->buffer) == NULL) |
|
945 return NULL; |
|
946 |
|
947 readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos); |
|
948 |
|
949 return gst_buffer_create_sub (buffer, data->pos, readsize); |
|
950 } |
|
951 |
|
952 /** |
|
953 * gst_collect_pads_take_buffer: |
|
954 * @pads: the collectspads to query |
|
955 * @data: the data to use |
|
956 * @size: the number of bytes to read |
|
957 * |
|
958 * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount |
|
959 * of read bytes. |
|
960 * |
|
961 * This function should be called with @pads LOCK held, such as in the callback. |
|
962 * |
|
963 * Since: 0.10.18 |
|
964 * |
|
965 * Returns: A sub buffer. The size of the buffer can be less that requested. |
|
966 * A return of NULL signals that the pad is end-of-stream. |
|
967 * Unref the buffer after use. |
|
968 * |
|
969 * MT safe. |
|
970 */ |
|
971 #ifdef __SYMBIAN32__ |
|
972 EXPORT_C |
|
973 #endif |
|
974 |
|
975 GstBuffer * |
|
976 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data, |
|
977 guint size) |
|
978 { |
|
979 GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size); |
|
980 |
|
981 if (buffer) { |
|
982 gst_collect_pads_flush (pads, data, GST_BUFFER_SIZE (buffer)); |
|
983 } |
|
984 return buffer; |
|
985 } |
|
986 |
|
987 /** |
|
988 * gst_collect_pads_flush: |
|
989 * @pads: the collectspads to query |
|
990 * @data: the data to use |
|
991 * @size: the number of bytes to flush |
|
992 * |
|
993 * Flush @size bytes from the pad @data. |
|
994 * |
|
995 * This function should be called with @pads LOCK held, such as |
|
996 * in the callback. |
|
997 * |
|
998 * Returns: The number of bytes flushed. This can be less than @size and |
|
999 * is 0 if the pad was end-of-stream. |
|
1000 * |
|
1001 * MT safe. |
|
1002 */ |
|
1003 #ifdef __SYMBIAN32__ |
|
1004 EXPORT_C |
|
1005 #endif |
|
1006 |
|
1007 guint |
|
1008 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data, |
|
1009 guint size) |
|
1010 { |
|
1011 guint flushsize; |
|
1012 GstBuffer *buffer; |
|
1013 |
|
1014 g_return_val_if_fail (pads != NULL, 0); |
|
1015 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0); |
|
1016 g_return_val_if_fail (data != NULL, 0); |
|
1017 |
|
1018 /* no buffer, must be EOS */ |
|
1019 if ((buffer = data->buffer) == NULL) |
|
1020 return 0; |
|
1021 |
|
1022 /* this is what we can flush at max */ |
|
1023 flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos); |
|
1024 |
|
1025 data->pos += size; |
|
1026 |
|
1027 if (data->pos >= GST_BUFFER_SIZE (buffer)) |
|
1028 /* _clear will also reset data->pos to 0 */ |
|
1029 gst_collect_pads_clear (pads, data); |
|
1030 |
|
1031 return flushsize; |
|
1032 } |
|
1033 |
|
1034 /* see if pads were added or removed and update our stats. Any pad |
|
1035 * added after releasing the PAD_LOCK will get collected in the next |
|
1036 * round. |
|
1037 * |
|
1038 * We can do a quick check by checking the cookies, that get changed |
|
1039 * whenever the pad list is updated. |
|
1040 * |
|
1041 * Must be called with LOCK. |
|
1042 */ |
|
1043 static void |
|
1044 gst_collect_pads_check_pads (GstCollectPads * pads) |
|
1045 { |
|
1046 /* the master list and cookie are protected with the PAD_LOCK */ |
|
1047 GST_COLLECT_PADS_PAD_LOCK (pads); |
|
1048 if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) { |
|
1049 GSList *collected; |
|
1050 |
|
1051 /* clear list and stats */ |
|
1052 g_slist_foreach (pads->data, (GFunc) unref_data, NULL); |
|
1053 g_slist_free (pads->data); |
|
1054 pads->data = NULL; |
|
1055 pads->numpads = 0; |
|
1056 pads->queuedpads = 0; |
|
1057 pads->eospads = 0; |
|
1058 |
|
1059 /* loop over the master pad list */ |
|
1060 collected = pads->abidata.ABI.pad_list; |
|
1061 for (; collected; collected = g_slist_next (collected)) { |
|
1062 GstCollectData *data; |
|
1063 |
|
1064 /* update the stats */ |
|
1065 pads->numpads++; |
|
1066 data = collected->data; |
|
1067 if (data->buffer) |
|
1068 pads->queuedpads++; |
|
1069 if (data->abidata.ABI.eos) |
|
1070 pads->eospads++; |
|
1071 |
|
1072 /* add to the list of pads to collect */ |
|
1073 ref_data (data); |
|
1074 pads->data = g_slist_prepend (pads->data, data); |
|
1075 } |
|
1076 /* and update the cookie */ |
|
1077 pads->cookie = pads->abidata.ABI.pad_cookie; |
|
1078 } |
|
1079 GST_COLLECT_PADS_PAD_UNLOCK (pads); |
|
1080 } |
|
1081 |
|
1082 /* checks if all the pads are collected and call the collectfunction |
|
1083 * |
|
1084 * Should be called with LOCK. |
|
1085 * |
|
1086 * Returns: The #GstFlowReturn of collection. |
|
1087 */ |
|
1088 static GstFlowReturn |
|
1089 gst_collect_pads_check_collected (GstCollectPads * pads) |
|
1090 { |
|
1091 GstFlowReturn flow_ret = GST_FLOW_OK; |
|
1092 |
|
1093 g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR); |
|
1094 g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED); |
|
1095 |
|
1096 /* check for new pads, update stats etc.. */ |
|
1097 gst_collect_pads_check_pads (pads); |
|
1098 |
|
1099 if (G_UNLIKELY (pads->eospads == pads->numpads)) { |
|
1100 /* If all our pads are EOS just collect once to let the element |
|
1101 * do its final EOS handling. */ |
|
1102 GST_DEBUG ("All active pads (%d) are EOS, calling %s", |
|
1103 pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func)); |
|
1104 flow_ret = pads->func (pads, pads->user_data); |
|
1105 } else { |
|
1106 gboolean collected = FALSE; |
|
1107 |
|
1108 /* We call the collected function as long as our condition matches. |
|
1109 * FIXME: should we error out if the collect function did not pop anything ? |
|
1110 * we can get a busy loop here if the element does not pop from the collect |
|
1111 * function |
|
1112 */ |
|
1113 while (((pads->queuedpads + pads->eospads) >= pads->numpads)) { |
|
1114 GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s", |
|
1115 pads->queuedpads, pads->eospads, pads->numpads, |
|
1116 GST_DEBUG_FUNCPTR_NAME (pads->func)); |
|
1117 flow_ret = pads->func (pads, pads->user_data); |
|
1118 collected = TRUE; |
|
1119 |
|
1120 /* break on error */ |
|
1121 if (flow_ret != GST_FLOW_OK) |
|
1122 break; |
|
1123 /* Don't keep looping after telling the element EOS or flushing */ |
|
1124 if (pads->queuedpads == 0) |
|
1125 break; |
|
1126 } |
|
1127 if (!collected) |
|
1128 GST_DEBUG ("Not all active pads (%d) have data, continuing", |
|
1129 pads->numpads); |
|
1130 } |
|
1131 return flow_ret; |
|
1132 } |
|
1133 |
|
1134 static gboolean |
|
1135 gst_collect_pads_event (GstPad * pad, GstEvent * event) |
|
1136 { |
|
1137 gboolean res; |
|
1138 GstCollectData *data; |
|
1139 GstCollectPads *pads; |
|
1140 |
|
1141 /* some magic to get the managing collect_pads */ |
|
1142 GST_OBJECT_LOCK (pad); |
|
1143 data = (GstCollectData *) gst_pad_get_element_private (pad); |
|
1144 if (G_UNLIKELY (data == NULL)) |
|
1145 goto pad_removed; |
|
1146 ref_data (data); |
|
1147 GST_OBJECT_UNLOCK (pad); |
|
1148 |
|
1149 res = TRUE; |
|
1150 |
|
1151 pads = data->collect; |
|
1152 |
|
1153 GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event), |
|
1154 GST_DEBUG_PAD_NAME (data->pad)); |
|
1155 |
|
1156 switch (GST_EVENT_TYPE (event)) { |
|
1157 case GST_EVENT_FLUSH_START: |
|
1158 { |
|
1159 /* forward event to unblock check_collected */ |
|
1160 gst_pad_event_default (pad, event); |
|
1161 |
|
1162 /* now unblock the chain function. |
|
1163 * no cond per pad, so they all unblock, |
|
1164 * non-flushing block again */ |
|
1165 GST_OBJECT_LOCK (pads); |
|
1166 data->abidata.ABI.flushing = TRUE; |
|
1167 gst_collect_pads_clear (pads, data); |
|
1168 GST_OBJECT_UNLOCK (pads); |
|
1169 |
|
1170 /* event already cleaned up by forwarding */ |
|
1171 goto done; |
|
1172 } |
|
1173 case GST_EVENT_FLUSH_STOP: |
|
1174 { |
|
1175 /* flush the 1 buffer queue */ |
|
1176 GST_OBJECT_LOCK (pads); |
|
1177 data->abidata.ABI.flushing = FALSE; |
|
1178 gst_collect_pads_clear (pads, data); |
|
1179 /* we need new segment info after the flush */ |
|
1180 gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED); |
|
1181 data->abidata.ABI.new_segment = FALSE; |
|
1182 /* if the pad was EOS, remove the EOS flag and |
|
1183 * decrement the number of eospads */ |
|
1184 if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) { |
|
1185 pads->eospads--; |
|
1186 data->abidata.ABI.eos = FALSE; |
|
1187 } |
|
1188 GST_OBJECT_UNLOCK (pads); |
|
1189 |
|
1190 /* forward event */ |
|
1191 goto forward; |
|
1192 } |
|
1193 case GST_EVENT_EOS: |
|
1194 { |
|
1195 GST_OBJECT_LOCK (pads); |
|
1196 /* if the pad was not EOS, make it EOS and so we |
|
1197 * have one more eospad */ |
|
1198 if (G_LIKELY (data->abidata.ABI.eos == FALSE)) { |
|
1199 data->abidata.ABI.eos = TRUE; |
|
1200 pads->eospads++; |
|
1201 } |
|
1202 /* check if we need collecting anything, we ignore the |
|
1203 * result. */ |
|
1204 gst_collect_pads_check_collected (pads); |
|
1205 GST_OBJECT_UNLOCK (pads); |
|
1206 |
|
1207 /* We eat this event, element should do something |
|
1208 * in the collected callback. */ |
|
1209 gst_event_unref (event); |
|
1210 goto done; |
|
1211 } |
|
1212 case GST_EVENT_NEWSEGMENT: |
|
1213 { |
|
1214 gint64 start, stop, time; |
|
1215 gdouble rate, arate; |
|
1216 GstFormat format; |
|
1217 gboolean update; |
|
1218 |
|
1219 gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format, |
|
1220 &start, &stop, &time); |
|
1221 |
|
1222 GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT |
|
1223 ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start), |
|
1224 GST_TIME_ARGS (stop)); |
|
1225 |
|
1226 gst_segment_set_newsegment_full (&data->segment, update, rate, arate, |
|
1227 format, start, stop, time); |
|
1228 |
|
1229 data->abidata.ABI.new_segment = TRUE; |
|
1230 |
|
1231 /* we must not forward this event since multiple segments will be |
|
1232 * accumulated and this is certainly not what we want. */ |
|
1233 gst_event_unref (event); |
|
1234 /* FIXME: collect-pads based elements need to create their own newsegment |
|
1235 * event (and only one really) |
|
1236 * (a) make the segment part of the GstCollectData structure of each pad, |
|
1237 * so you can just check that once you have a buffer queued on that pad, |
|
1238 * (b) you can override a pad's event function with your own, |
|
1239 * catch the newsegment event and then pass it on to the original |
|
1240 * gstcollectpads event function |
|
1241 * (that's what avimux does for something IIRC) |
|
1242 * see #340060 |
|
1243 */ |
|
1244 goto done; |
|
1245 } |
|
1246 default: |
|
1247 /* forward other events */ |
|
1248 goto forward; |
|
1249 } |
|
1250 |
|
1251 forward: |
|
1252 res = gst_pad_event_default (pad, event); |
|
1253 |
|
1254 done: |
|
1255 unref_data (data); |
|
1256 return res; |
|
1257 |
|
1258 /* ERRORS */ |
|
1259 pad_removed: |
|
1260 { |
|
1261 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad)); |
|
1262 GST_OBJECT_UNLOCK (pad); |
|
1263 return FALSE; |
|
1264 } |
|
1265 } |
|
1266 |
|
1267 /* For each buffer we receive we check if our collected condition is reached |
|
1268 * and if so we call the collected function. When this is done we check if |
|
1269 * data has been unqueued. If data is still queued we wait holding the stream |
|
1270 * lock to make sure no EOS event can happen while we are ready to be |
|
1271 * collected |
|
1272 */ |
|
1273 static GstFlowReturn |
|
1274 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer) |
|
1275 { |
|
1276 GstCollectData *data; |
|
1277 GstCollectPads *pads; |
|
1278 guint64 size; |
|
1279 GstFlowReturn ret; |
|
1280 GstBuffer **buffer_p; |
|
1281 |
|
1282 GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad)); |
|
1283 |
|
1284 /* some magic to get the managing collect_pads */ |
|
1285 GST_OBJECT_LOCK (pad); |
|
1286 data = (GstCollectData *) gst_pad_get_element_private (pad); |
|
1287 if (G_UNLIKELY (data == NULL)) |
|
1288 goto no_data; |
|
1289 ref_data (data); |
|
1290 GST_OBJECT_UNLOCK (pad); |
|
1291 |
|
1292 pads = data->collect; |
|
1293 size = GST_BUFFER_SIZE (buffer); |
|
1294 |
|
1295 GST_OBJECT_LOCK (pads); |
|
1296 /* if not started, bail out */ |
|
1297 if (G_UNLIKELY (!pads->started)) |
|
1298 goto not_started; |
|
1299 /* check if this pad is flushing */ |
|
1300 if (G_UNLIKELY (data->abidata.ABI.flushing)) |
|
1301 goto flushing; |
|
1302 /* pad was EOS, we can refuse this data */ |
|
1303 if (G_UNLIKELY (data->abidata.ABI.eos)) |
|
1304 goto unexpected; |
|
1305 |
|
1306 GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer, |
|
1307 GST_DEBUG_PAD_NAME (pad)); |
|
1308 |
|
1309 /* One more pad has data queued */ |
|
1310 pads->queuedpads++; |
|
1311 buffer_p = &data->buffer; |
|
1312 gst_buffer_replace (buffer_p, buffer); |
|
1313 |
|
1314 /* update segment last position if in TIME */ |
|
1315 if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) { |
|
1316 GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer); |
|
1317 |
|
1318 if (GST_CLOCK_TIME_IS_VALID (timestamp)) |
|
1319 gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp); |
|
1320 } |
|
1321 |
|
1322 /* While we have data queued on this pad try to collect stuff */ |
|
1323 do { |
|
1324 /* Check if our collected condition is matched and call the collected function |
|
1325 * if it is */ |
|
1326 ret = gst_collect_pads_check_collected (pads); |
|
1327 /* when an error occurs, we want to report this back to the caller ASAP |
|
1328 * without having to block if the buffer was not popped */ |
|
1329 if (G_UNLIKELY (ret != GST_FLOW_OK)) |
|
1330 goto error; |
|
1331 |
|
1332 /* data was consumed, we can exit and accept new data */ |
|
1333 if (data->buffer == NULL) |
|
1334 break; |
|
1335 |
|
1336 /* Check if we got removed in the mean time, FIXME, this is racy. |
|
1337 * Between this check and the _WAIT, the pad could be removed which will |
|
1338 * makes us hang in the _WAIT. */ |
|
1339 GST_OBJECT_LOCK (pad); |
|
1340 if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL)) |
|
1341 goto pad_removed; |
|
1342 GST_OBJECT_UNLOCK (pad); |
|
1343 |
|
1344 GST_DEBUG ("Pad %s:%s has a buffer queued, waiting", |
|
1345 GST_DEBUG_PAD_NAME (pad)); |
|
1346 |
|
1347 /* wait to be collected, this must happen from another thread triggered |
|
1348 * by the _chain function of another pad. We release the lock so we |
|
1349 * can get stopped or flushed as well. We can however not get EOS |
|
1350 * because we still hold the STREAM_LOCK. |
|
1351 */ |
|
1352 GST_COLLECT_PADS_WAIT (pads); |
|
1353 |
|
1354 GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad)); |
|
1355 |
|
1356 /* after a signal, we could be stopped */ |
|
1357 if (G_UNLIKELY (!pads->started)) |
|
1358 goto not_started; |
|
1359 /* check if this pad is flushing */ |
|
1360 if (G_UNLIKELY (data->abidata.ABI.flushing)) |
|
1361 goto flushing; |
|
1362 } |
|
1363 while (data->buffer != NULL); |
|
1364 |
|
1365 unlock_done: |
|
1366 GST_OBJECT_UNLOCK (pads); |
|
1367 unref_data (data); |
|
1368 gst_buffer_unref (buffer); |
|
1369 return ret; |
|
1370 |
|
1371 pad_removed: |
|
1372 { |
|
1373 GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad)); |
|
1374 GST_OBJECT_UNLOCK (pad); |
|
1375 ret = GST_FLOW_NOT_LINKED; |
|
1376 goto unlock_done; |
|
1377 } |
|
1378 /* ERRORS */ |
|
1379 no_data: |
|
1380 { |
|
1381 GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad)); |
|
1382 GST_OBJECT_UNLOCK (pad); |
|
1383 gst_buffer_unref (buffer); |
|
1384 return GST_FLOW_NOT_LINKED; |
|
1385 } |
|
1386 not_started: |
|
1387 { |
|
1388 GST_DEBUG ("not started"); |
|
1389 gst_collect_pads_clear (pads, data); |
|
1390 ret = GST_FLOW_WRONG_STATE; |
|
1391 goto unlock_done; |
|
1392 } |
|
1393 flushing: |
|
1394 { |
|
1395 GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad)); |
|
1396 gst_collect_pads_clear (pads, data); |
|
1397 ret = GST_FLOW_WRONG_STATE; |
|
1398 goto unlock_done; |
|
1399 } |
|
1400 unexpected: |
|
1401 { |
|
1402 /* we should not post an error for this, just inform upstream that |
|
1403 * we don't expect anything anymore */ |
|
1404 GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad)); |
|
1405 ret = GST_FLOW_UNEXPECTED; |
|
1406 goto unlock_done; |
|
1407 } |
|
1408 error: |
|
1409 { |
|
1410 /* we print the error, the element should post a reasonable error |
|
1411 * message for fatal errors */ |
|
1412 GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret)); |
|
1413 gst_collect_pads_clear (pads, data); |
|
1414 goto unlock_done; |
|
1415 } |
|
1416 } |