changeset 8 | 4a7fac7dd34a |
parent 0 | 0e761a78d257 |
child 30 | 7e817e7e631c |
7:71e347f905f2 | 8:4a7fac7dd34a |
---|---|
21 * Boston, MA 02111-1307, USA. |
21 * Boston, MA 02111-1307, USA. |
22 */ |
22 */ |
23 |
23 |
24 /** |
24 /** |
25 * SECTION:element-multiqueue |
25 * SECTION:element-multiqueue |
26 * @short_description: Asynchronous data queues |
|
27 * @see_also: #GstQueue |
26 * @see_also: #GstQueue |
28 * |
27 * |
29 * <refsect2> |
28 * <refsect2> |
30 * <para> |
29 * <para> |
31 * Multiqueue is similar to a normal #GstQueue with the following additional |
30 * Multiqueue is similar to a normal #GstQueue with the following additional |
33 * <orderedlist> |
32 * <orderedlist> |
34 * <listitem> |
33 * <listitem> |
35 * <itemizedlist><title>Multiple streamhandling</title> |
34 * <itemizedlist><title>Multiple streamhandling</title> |
36 * <listitem><para> |
35 * <listitem><para> |
37 * The element handles queueing data on more than one stream at once. To |
36 * The element handles queueing data on more than one stream at once. To |
38 * achieve such a feature it has request sink pads (sink_%d) and |
37 * achieve such a feature it has request sink pads (sink%d) and |
39 * 'sometimes' src pads (src_%d). |
38 * 'sometimes' src pads (src%d). |
40 * </para><para> |
39 * </para><para> |
41 * When requesting a given sinkpad with gst_element_get_request_pad(), |
40 * When requesting a given sinkpad with gst_element_get_request_pad(), |
42 * the associated srcpad for that stream will be created. |
41 * the associated srcpad for that stream will be created. |
43 * Ex: requesting sink_1 will generate src_1. |
42 * Example: requesting sink1 will generate src1. |
44 * </para></listitem> |
43 * </para></listitem> |
45 * </itemizedlist> |
44 * </itemizedlist> |
46 * </listitem> |
45 * </listitem> |
47 * <listitem> |
46 * <listitem> |
48 * <itemizedlist><title>Non-starvation on multiple streams</title> |
47 * <itemizedlist><title>Non-starvation on multiple streams</title> |
110 #endif |
109 #endif |
111 |
110 |
112 #include <gst/gst.h> |
111 #include <gst/gst.h> |
113 #include "gstmultiqueue.h" |
112 #include "gstmultiqueue.h" |
114 |
113 |
115 #ifdef __SYMBIAN32__ |
|
116 #include <glib_global.h> |
|
117 #include <gobject_global.h> |
|
118 #endif |
|
119 |
|
120 /** |
114 /** |
121 * GstSingleQueue: |
115 * GstSingleQueue: |
122 * @sinkpad: associated sink #GstPad |
116 * @sinkpad: associated sink #GstPad |
123 * @srcpad: associated source #GstPad |
117 * @srcpad: associated source #GstPad |
124 * |
118 * |
173 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); |
167 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); |
174 static void gst_single_queue_free (GstSingleQueue * squeue); |
168 static void gst_single_queue_free (GstSingleQueue * squeue); |
175 |
169 |
176 static void wake_up_next_non_linked (GstMultiQueue * mq); |
170 static void wake_up_next_non_linked (GstMultiQueue * mq); |
177 static void compute_high_id (GstMultiQueue * mq); |
171 static void compute_high_id (GstMultiQueue * mq); |
172 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); |
|
178 |
173 |
179 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", |
174 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", |
180 GST_PAD_SINK, |
175 GST_PAD_SINK, |
181 GST_PAD_REQUEST, |
176 GST_PAD_REQUEST, |
182 GST_STATIC_CAPS_ANY); |
177 GST_STATIC_CAPS_ANY); |
275 GST_DEBUG_FUNCPTR (gst_multi_queue_set_property); |
270 GST_DEBUG_FUNCPTR (gst_multi_queue_set_property); |
276 gobject_class->get_property = |
271 gobject_class->get_property = |
277 GST_DEBUG_FUNCPTR (gst_multi_queue_get_property); |
272 GST_DEBUG_FUNCPTR (gst_multi_queue_get_property); |
278 |
273 |
279 /* SIGNALS */ |
274 /* SIGNALS */ |
275 |
|
276 /** |
|
277 * GstMultiQueue::underrun: |
|
278 * @multiqueue: the multqueue instance |
|
279 * |
|
280 * This signal is emitted from the streaming thread when there is |
|
281 * no data in any of the queues inside the multiqueue instance (underrun). |
|
282 * |
|
283 * This indicates either starvation or EOS from the upstream data sources. |
|
284 */ |
|
280 gst_multi_queue_signals[SIGNAL_UNDERRUN] = |
285 gst_multi_queue_signals[SIGNAL_UNDERRUN] = |
281 g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
286 g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
282 G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL, |
287 G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL, |
283 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
288 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
284 |
289 |
290 /** |
|
291 * GstMultiQueue::overrun: |
|
292 * @multiqueue: the multiqueue instance |
|
293 * |
|
294 * Reports that one of the queues in the multiqueue is full (overrun). |
|
295 * A queue is full if the total amount of data inside it (num-buffers, time, |
|
296 * size) is higher than the boundary values which can be set through the |
|
297 * GObject properties. |
|
298 * |
|
299 * This can be used as an indicator of pre-roll. |
|
300 */ |
|
285 gst_multi_queue_signals[SIGNAL_OVERRUN] = |
301 gst_multi_queue_signals[SIGNAL_OVERRUN] = |
286 g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
302 g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
287 G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL, |
303 G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL, |
288 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
304 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
289 |
305 |
290 /* PROPERTIES */ |
306 /* PROPERTIES */ |
291 |
307 |
292 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, |
308 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, |
293 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
309 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
294 "Max. amount of data in the queue (bytes, 0=disable)", |
310 "Max. amount of data in the queue (bytes, 0=disable)", |
295 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); |
311 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, |
312 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
296 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, |
313 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, |
297 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
314 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
298 "Max. number of buffers in the queue (0=disable)", |
315 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT, |
299 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); |
316 DEFAULT_MAX_SIZE_BUFFERS, |
317 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
300 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, |
318 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, |
301 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
319 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
302 "Max. amount of data in the queue (in ns, 0=disable)", |
320 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64, |
303 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); |
321 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
304 |
322 |
305 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES, |
323 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES, |
306 g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)", |
324 g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)", |
307 "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)", |
325 "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)", |
308 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE)); |
326 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, |
327 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
309 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS, |
328 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS, |
310 g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)", |
329 g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)", |
311 "Amount of buffers the queues can grow if one of them is empty (0=disable)", |
330 "Amount of buffers the queues can grow if one of them is empty (0=disable)", |
312 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE)); |
331 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, |
332 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
313 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME, |
333 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME, |
314 g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)", |
334 g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)", |
315 "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)", |
335 "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)", |
316 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE)); |
336 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, |
337 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
317 |
338 |
318 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize); |
339 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize); |
319 |
340 |
320 gstelement_class->request_new_pad = |
341 gstelement_class->request_new_pad = |
321 GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad); |
342 GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad); |
603 gint64 sink_time, src_time; |
624 gint64 sink_time, src_time; |
604 |
625 |
605 sink_time = |
626 sink_time = |
606 gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, |
627 gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, |
607 sq->sink_segment.last_stop); |
628 sq->sink_segment.last_stop); |
629 if (sink_time == GST_CLOCK_TIME_NONE) |
|
630 goto beach; |
|
608 |
631 |
609 src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME, |
632 src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME, |
610 sq->src_segment.last_stop); |
633 sq->src_segment.last_stop); |
634 if (src_time == GST_CLOCK_TIME_NONE) |
|
635 goto beach; |
|
611 |
636 |
612 GST_DEBUG_OBJECT (mq, |
637 GST_DEBUG_OBJECT (mq, |
613 "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, |
638 "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, |
614 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); |
639 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); |
615 |
640 |
616 /* This allows for streams with out of order timestamping - sometimes the |
641 /* This allows for streams with out of order timestamping - sometimes the |
617 * emerging timestamp is later than the arriving one(s) */ |
642 * emerging timestamp is later than the arriving one(s) */ |
618 if (sink_time >= src_time) |
643 if (sink_time < src_time) |
619 sq->cur_time = sink_time - src_time; |
644 goto beach; |
620 else |
645 |
621 sq->cur_time = 0; |
646 sq->cur_time = sink_time - src_time; |
647 return; |
|
648 |
|
649 beach: |
|
650 sq->cur_time = 0; |
|
622 } |
651 } |
623 |
652 |
624 /* take a NEWSEGMENT event and apply the values to segment, updating the time |
653 /* take a NEWSEGMENT event and apply the values to segment, updating the time |
625 * level of queue. */ |
654 * level of queue. */ |
626 static void |
655 static void |
765 static void |
794 static void |
766 gst_multi_queue_item_destroy (GstMultiQueueItem * item) |
795 gst_multi_queue_item_destroy (GstMultiQueueItem * item) |
767 { |
796 { |
768 if (item->object) |
797 if (item->object) |
769 gst_mini_object_unref (item->object); |
798 gst_mini_object_unref (item->object); |
770 g_free (item); |
799 g_slice_free (GstMultiQueueItem, item); |
771 } |
800 } |
772 |
801 |
773 /* takes ownership of passed mini object! */ |
802 /* takes ownership of passed mini object! */ |
774 static GstMultiQueueItem * |
803 static GstMultiQueueItem * |
775 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) |
804 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) |
776 { |
805 { |
777 GstMultiQueueItem *item; |
806 GstMultiQueueItem *item; |
778 |
807 |
779 item = g_new (GstMultiQueueItem, 1); |
808 item = g_slice_new (GstMultiQueueItem); |
780 item->object = object; |
809 item->object = object; |
781 item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; |
810 item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; |
782 item->posid = curid; |
811 item->posid = curid; |
783 |
812 |
784 if (GST_IS_BUFFER (object)) { |
813 if (GST_IS_BUFFER (object)) { |
1060 * a lock, the _check_full happens from this thread only, right before pushing |
1089 * a lock, the _check_full happens from this thread only, right before pushing |
1061 * into dataqueue. */ |
1090 * into dataqueue. */ |
1062 switch (type) { |
1091 switch (type) { |
1063 case GST_EVENT_EOS: |
1092 case GST_EVENT_EOS: |
1064 sq->is_eos = TRUE; |
1093 sq->is_eos = TRUE; |
1094 single_queue_overrun_cb (sq->queue, sq); |
|
1065 break; |
1095 break; |
1066 case GST_EVENT_NEWSEGMENT: |
1096 case GST_EVENT_NEWSEGMENT: |
1067 apply_segment (mq, sq, sref, &sq->sink_segment); |
1097 apply_segment (mq, sq, sref, &sq->sink_segment); |
1068 gst_event_unref (sref); |
1098 gst_event_unref (sref); |
1069 break; |
1099 break; |
1265 GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); |
1295 GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); |
1266 |
1296 |
1267 if (gst_data_queue_is_empty (ssq->queue)) { |
1297 if (gst_data_queue_is_empty (ssq->queue)) { |
1268 GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); |
1298 GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); |
1269 if (IS_FILLED (visible, size.visible)) { |
1299 if (IS_FILLED (visible, size.visible)) { |
1270 sq->max_size.visible++; |
1300 sq->max_size.visible = size.visible + 1; |
1271 GST_DEBUG_OBJECT (mq, |
1301 GST_DEBUG_OBJECT (mq, |
1272 "Another queue is empty, bumping single queue %d max visible to %d", |
1302 "Another queue is empty, bumping single queue %d max visible to %d", |
1273 sq->id, sq->max_size.visible); |
1303 sq->id, sq->max_size.visible); |
1274 } |
1304 } |
1275 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); |
1305 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); |
1282 "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" |
1312 "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" |
1283 G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, |
1313 G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, |
1284 ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); |
1314 ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); |
1285 |
1315 |
1286 /* if this queue is filled completely we must signal overrun */ |
1316 /* if this queue is filled completely we must signal overrun */ |
1287 if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) { |
1317 if (sq->is_eos || IS_FILLED (bytes, ssize.bytes) || |
1318 IS_FILLED (time, sq->cur_time)) { |
|
1288 GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); |
1319 GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); |
1289 filled = TRUE; |
1320 filled = TRUE; |
1290 } |
1321 } |
1291 } |
1322 } |
1292 /* no queues were empty */ |
1323 /* no queues were empty */ |
1319 if (gst_data_queue_is_full (sq->queue)) { |
1350 if (gst_data_queue_is_full (sq->queue)) { |
1320 GstDataQueueSize size; |
1351 GstDataQueueSize size; |
1321 |
1352 |
1322 gst_data_queue_get_level (sq->queue, &size); |
1353 gst_data_queue_get_level (sq->queue, &size); |
1323 if (IS_FILLED (visible, size.visible)) { |
1354 if (IS_FILLED (visible, size.visible)) { |
1324 sq->max_size.visible++; |
1355 sq->max_size.visible = size.visible + 1; |
1325 GST_DEBUG_OBJECT (mq, |
1356 GST_DEBUG_OBJECT (mq, |
1326 "queue %d is filled, bumping its max visible to %d", sq->id, |
1357 "queue %d is filled, bumping its max visible to %d", sq->id, |
1327 sq->max_size.visible); |
1358 sq->max_size.visible); |
1328 gst_data_queue_limits_changed (sq->queue); |
1359 gst_data_queue_limits_changed (sq->queue); |
1329 } |
1360 } |
1355 |
1386 |
1356 /* we never go past the max visible items */ |
1387 /* we never go past the max visible items */ |
1357 if (IS_FILLED (visible, visible)) |
1388 if (IS_FILLED (visible, visible)) |
1358 return TRUE; |
1389 return TRUE; |
1359 |
1390 |
1360 if (sq->cur_time != 0) { |
1391 /* check time or bytes */ |
1361 /* if we have valid time in the queue, check */ |
1392 res = IS_FILLED (time, sq->cur_time) || IS_FILLED (bytes, bytes); |
1362 res = IS_FILLED (time, sq->cur_time); |
1393 |
1363 } else { |
|
1364 /* no valid time, check bytes */ |
|
1365 res = IS_FILLED (bytes, bytes); |
|
1366 } |
|
1367 return res; |
1394 return res; |
1368 } |
1395 } |
1369 |
1396 |
1370 static void |
1397 static void |
1371 gst_single_queue_free (GstSingleQueue * sq) |
1398 gst_single_queue_free (GstSingleQueue * sq) |
1453 GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); |
1480 GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); |
1454 |
1481 |
1455 gst_pad_set_element_private (sq->sinkpad, (gpointer) sq); |
1482 gst_pad_set_element_private (sq->sinkpad, (gpointer) sq); |
1456 gst_pad_set_element_private (sq->srcpad, (gpointer) sq); |
1483 gst_pad_set_element_private (sq->srcpad, (gpointer) sq); |
1457 |
1484 |
1458 gst_pad_set_active (sq->srcpad, TRUE); |
1485 /* only activate the pads when we are not in the NULL state |
1486 * and add the pad under the state_lock to prevend state changes |
|
1487 * between activating and adding */ |
|
1488 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue)); |
|
1489 if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) { |
|
1490 gst_pad_set_active (sq->srcpad, TRUE); |
|
1491 gst_pad_set_active (sq->sinkpad, TRUE); |
|
1492 } |
|
1459 gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); |
1493 gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); |
1460 |
|
1461 gst_pad_set_active (sq->sinkpad, TRUE); |
|
1462 gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); |
1494 gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); |
1495 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue)); |
|
1463 |
1496 |
1464 GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added", |
1497 GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added", |
1465 sq->id); |
1498 sq->id); |
1466 |
1499 |
1467 return sq; |
1500 return sq; |