branch | RCL_3 |
changeset 30 | 7e817e7e631c |
parent 29 | 567bb019e3e3 |
29:567bb019e3e3 | 30:7e817e7e631c |
---|---|
21 * Boston, MA 02111-1307, USA. |
21 * Boston, MA 02111-1307, USA. |
22 */ |
22 */ |
23 |
23 |
24 /** |
24 /** |
25 * SECTION:element-multiqueue |
25 * SECTION:element-multiqueue |
26 * @short_description: Asynchronous data queues |
|
26 * @see_also: #GstQueue |
27 * @see_also: #GstQueue |
27 * |
28 * |
28 * <refsect2> |
29 * <refsect2> |
29 * <para> |
30 * <para> |
30 * Multiqueue is similar to a normal #GstQueue with the following additional |
31 * Multiqueue is similar to a normal #GstQueue with the following additional |
32 * <orderedlist> |
33 * <orderedlist> |
33 * <listitem> |
34 * <listitem> |
34 * <itemizedlist><title>Multiple streamhandling</title> |
35 * <itemizedlist><title>Multiple streamhandling</title> |
35 * <listitem><para> |
36 * <listitem><para> |
36 * The element handles queueing data on more than one stream at once. To |
37 * The element handles queueing data on more than one stream at once. To |
37 * achieve such a feature it has request sink pads (sink%d) and |
38 * achieve such a feature it has request sink pads (sink_%d) and |
38 * 'sometimes' src pads (src%d). |
39 * 'sometimes' src pads (src_%d). |
39 * </para><para> |
40 * </para><para> |
40 * When requesting a given sinkpad with gst_element_get_request_pad(), |
41 * When requesting a given sinkpad with gst_element_get_request_pad(), |
41 * the associated srcpad for that stream will be created. |
42 * the associated srcpad for that stream will be created. |
42 * Example: requesting sink1 will generate src1. |
43 * Ex: requesting sink_1 will generate src_1. |
43 * </para></listitem> |
44 * </para></listitem> |
44 * </itemizedlist> |
45 * </itemizedlist> |
45 * </listitem> |
46 * </listitem> |
46 * <listitem> |
47 * <listitem> |
47 * <itemizedlist><title>Non-starvation on multiple streams</title> |
48 * <itemizedlist><title>Non-starvation on multiple streams</title> |
109 #endif |
110 #endif |
110 |
111 |
111 #include <gst/gst.h> |
112 #include <gst/gst.h> |
112 #include "gstmultiqueue.h" |
113 #include "gstmultiqueue.h" |
113 |
114 |
115 #ifdef __SYMBIAN32__ |
|
116 #include <glib_global.h> |
|
117 #include <gobject_global.h> |
|
118 #endif |
|
119 |
|
114 /** |
120 /** |
115 * GstSingleQueue: |
121 * GstSingleQueue: |
116 * @sinkpad: associated sink #GstPad |
122 * @sinkpad: associated sink #GstPad |
117 * @srcpad: associated source #GstPad |
123 * @srcpad: associated source #GstPad |
118 * |
124 * |
167 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); |
173 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue); |
168 static void gst_single_queue_free (GstSingleQueue * squeue); |
174 static void gst_single_queue_free (GstSingleQueue * squeue); |
169 |
175 |
170 static void wake_up_next_non_linked (GstMultiQueue * mq); |
176 static void wake_up_next_non_linked (GstMultiQueue * mq); |
171 static void compute_high_id (GstMultiQueue * mq); |
177 static void compute_high_id (GstMultiQueue * mq); |
172 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq); |
|
173 |
178 |
174 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", |
179 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d", |
175 GST_PAD_SINK, |
180 GST_PAD_SINK, |
176 GST_PAD_REQUEST, |
181 GST_PAD_REQUEST, |
177 GST_STATIC_CAPS_ANY); |
182 GST_STATIC_CAPS_ANY); |
270 GST_DEBUG_FUNCPTR (gst_multi_queue_set_property); |
275 GST_DEBUG_FUNCPTR (gst_multi_queue_set_property); |
271 gobject_class->get_property = |
276 gobject_class->get_property = |
272 GST_DEBUG_FUNCPTR (gst_multi_queue_get_property); |
277 GST_DEBUG_FUNCPTR (gst_multi_queue_get_property); |
273 |
278 |
274 /* SIGNALS */ |
279 /* SIGNALS */ |
275 |
|
276 /** |
|
277 * GstMultiQueue::underrun: |
|
278 * @multiqueue: the multqueue instance |
|
279 * |
|
280 * This signal is emitted from the streaming thread when there is |
|
281 * no data in any of the queues inside the multiqueue instance (underrun). |
|
282 * |
|
283 * This indicates either starvation or EOS from the upstream data sources. |
|
284 */ |
|
285 gst_multi_queue_signals[SIGNAL_UNDERRUN] = |
280 gst_multi_queue_signals[SIGNAL_UNDERRUN] = |
286 g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
281 g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
287 G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL, |
282 G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL, |
288 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
283 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
289 |
284 |
290 /** |
|
291 * GstMultiQueue::overrun: |
|
292 * @multiqueue: the multiqueue instance |
|
293 * |
|
294 * Reports that one of the queues in the multiqueue is full (overrun). |
|
295 * A queue is full if the total amount of data inside it (num-buffers, time, |
|
296 * size) is higher than the boundary values which can be set through the |
|
297 * GObject properties. |
|
298 * |
|
299 * This can be used as an indicator of pre-roll. |
|
300 */ |
|
301 gst_multi_queue_signals[SIGNAL_OVERRUN] = |
285 gst_multi_queue_signals[SIGNAL_OVERRUN] = |
302 g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
286 g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST, |
303 G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL, |
287 G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL, |
304 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
288 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); |
305 |
289 |
306 /* PROPERTIES */ |
290 /* PROPERTIES */ |
307 |
291 |
308 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, |
292 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES, |
309 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
293 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
310 "Max. amount of data in the queue (bytes, 0=disable)", |
294 "Max. amount of data in the queue (bytes, 0=disable)", |
311 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, |
295 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); |
312 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
313 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, |
296 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS, |
314 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
297 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
315 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT, |
298 "Max. number of buffers in the queue (0=disable)", |
316 DEFAULT_MAX_SIZE_BUFFERS, |
299 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); |
317 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
318 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, |
300 g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME, |
319 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
301 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
320 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64, |
302 "Max. amount of data in the queue (in ns, 0=disable)", |
321 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
303 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); |
322 |
304 |
323 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES, |
305 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES, |
324 g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)", |
306 g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)", |
325 "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)", |
307 "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)", |
326 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, |
308 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE)); |
327 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
328 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS, |
309 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS, |
329 g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)", |
310 g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)", |
330 "Amount of buffers the queues can grow if one of them is empty (0=disable)", |
311 "Amount of buffers the queues can grow if one of them is empty (0=disable)", |
331 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, |
312 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE)); |
332 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
333 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME, |
313 g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME, |
334 g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)", |
314 g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)", |
335 "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)", |
315 "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)", |
336 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, |
316 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE)); |
337 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
338 |
317 |
339 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize); |
318 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize); |
340 |
319 |
341 gstelement_class->request_new_pad = |
320 gstelement_class->request_new_pad = |
342 GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad); |
321 GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad); |
624 gint64 sink_time, src_time; |
603 gint64 sink_time, src_time; |
625 |
604 |
626 sink_time = |
605 sink_time = |
627 gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, |
606 gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME, |
628 sq->sink_segment.last_stop); |
607 sq->sink_segment.last_stop); |
629 if (sink_time == GST_CLOCK_TIME_NONE) |
|
630 goto beach; |
|
631 |
608 |
632 src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME, |
609 src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME, |
633 sq->src_segment.last_stop); |
610 sq->src_segment.last_stop); |
634 if (src_time == GST_CLOCK_TIME_NONE) |
|
635 goto beach; |
|
636 |
611 |
637 GST_DEBUG_OBJECT (mq, |
612 GST_DEBUG_OBJECT (mq, |
638 "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, |
613 "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id, |
639 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); |
614 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); |
640 |
615 |
641 /* This allows for streams with out of order timestamping - sometimes the |
616 /* This allows for streams with out of order timestamping - sometimes the |
642 * emerging timestamp is later than the arriving one(s) */ |
617 * emerging timestamp is later than the arriving one(s) */ |
643 if (sink_time < src_time) |
618 if (sink_time >= src_time) |
644 goto beach; |
619 sq->cur_time = sink_time - src_time; |
645 |
620 else |
646 sq->cur_time = sink_time - src_time; |
621 sq->cur_time = 0; |
647 return; |
|
648 |
|
649 beach: |
|
650 sq->cur_time = 0; |
|
651 } |
622 } |
652 |
623 |
653 /* take a NEWSEGMENT event and apply the values to segment, updating the time |
624 /* take a NEWSEGMENT event and apply the values to segment, updating the time |
654 * level of queue. */ |
625 * level of queue. */ |
655 static void |
626 static void |
794 static void |
765 static void |
795 gst_multi_queue_item_destroy (GstMultiQueueItem * item) |
766 gst_multi_queue_item_destroy (GstMultiQueueItem * item) |
796 { |
767 { |
797 if (item->object) |
768 if (item->object) |
798 gst_mini_object_unref (item->object); |
769 gst_mini_object_unref (item->object); |
799 g_slice_free (GstMultiQueueItem, item); |
770 g_free (item); |
800 } |
771 } |
801 |
772 |
802 /* takes ownership of passed mini object! */ |
773 /* takes ownership of passed mini object! */ |
803 static GstMultiQueueItem * |
774 static GstMultiQueueItem * |
804 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) |
775 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid) |
805 { |
776 { |
806 GstMultiQueueItem *item; |
777 GstMultiQueueItem *item; |
807 |
778 |
808 item = g_slice_new (GstMultiQueueItem); |
779 item = g_new (GstMultiQueueItem, 1); |
809 item->object = object; |
780 item->object = object; |
810 item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; |
781 item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy; |
811 item->posid = curid; |
782 item->posid = curid; |
812 |
783 |
813 if (GST_IS_BUFFER (object)) { |
784 if (GST_IS_BUFFER (object)) { |
1089 * a lock, the _check_full happens from this thread only, right before pushing |
1060 * a lock, the _check_full happens from this thread only, right before pushing |
1090 * into dataqueue. */ |
1061 * into dataqueue. */ |
1091 switch (type) { |
1062 switch (type) { |
1092 case GST_EVENT_EOS: |
1063 case GST_EVENT_EOS: |
1093 sq->is_eos = TRUE; |
1064 sq->is_eos = TRUE; |
1094 single_queue_overrun_cb (sq->queue, sq); |
|
1095 break; |
1065 break; |
1096 case GST_EVENT_NEWSEGMENT: |
1066 case GST_EVENT_NEWSEGMENT: |
1097 apply_segment (mq, sq, sref, &sq->sink_segment); |
1067 apply_segment (mq, sq, sref, &sq->sink_segment); |
1098 gst_event_unref (sref); |
1068 gst_event_unref (sref); |
1099 break; |
1069 break; |
1295 GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); |
1265 GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id); |
1296 |
1266 |
1297 if (gst_data_queue_is_empty (ssq->queue)) { |
1267 if (gst_data_queue_is_empty (ssq->queue)) { |
1298 GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); |
1268 GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id); |
1299 if (IS_FILLED (visible, size.visible)) { |
1269 if (IS_FILLED (visible, size.visible)) { |
1300 sq->max_size.visible = size.visible + 1; |
1270 sq->max_size.visible++; |
1301 GST_DEBUG_OBJECT (mq, |
1271 GST_DEBUG_OBJECT (mq, |
1302 "Another queue is empty, bumping single queue %d max visible to %d", |
1272 "Another queue is empty, bumping single queue %d max visible to %d", |
1303 sq->id, sq->max_size.visible); |
1273 sq->id, sq->max_size.visible); |
1304 } |
1274 } |
1305 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); |
1275 GST_MULTI_QUEUE_MUTEX_UNLOCK (mq); |
1312 "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" |
1282 "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%" |
1313 G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, |
1283 G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible, |
1314 ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); |
1284 ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time); |
1315 |
1285 |
1316 /* if this queue is filled completely we must signal overrun */ |
1286 /* if this queue is filled completely we must signal overrun */ |
1317 if (sq->is_eos || IS_FILLED (bytes, ssize.bytes) || |
1287 if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) { |
1318 IS_FILLED (time, sq->cur_time)) { |
|
1319 GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); |
1288 GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id); |
1320 filled = TRUE; |
1289 filled = TRUE; |
1321 } |
1290 } |
1322 } |
1291 } |
1323 /* no queues were empty */ |
1292 /* no queues were empty */ |
1350 if (gst_data_queue_is_full (sq->queue)) { |
1319 if (gst_data_queue_is_full (sq->queue)) { |
1351 GstDataQueueSize size; |
1320 GstDataQueueSize size; |
1352 |
1321 |
1353 gst_data_queue_get_level (sq->queue, &size); |
1322 gst_data_queue_get_level (sq->queue, &size); |
1354 if (IS_FILLED (visible, size.visible)) { |
1323 if (IS_FILLED (visible, size.visible)) { |
1355 sq->max_size.visible = size.visible + 1; |
1324 sq->max_size.visible++; |
1356 GST_DEBUG_OBJECT (mq, |
1325 GST_DEBUG_OBJECT (mq, |
1357 "queue %d is filled, bumping its max visible to %d", sq->id, |
1326 "queue %d is filled, bumping its max visible to %d", sq->id, |
1358 sq->max_size.visible); |
1327 sq->max_size.visible); |
1359 gst_data_queue_limits_changed (sq->queue); |
1328 gst_data_queue_limits_changed (sq->queue); |
1360 } |
1329 } |
1386 |
1355 |
1387 /* we never go past the max visible items */ |
1356 /* we never go past the max visible items */ |
1388 if (IS_FILLED (visible, visible)) |
1357 if (IS_FILLED (visible, visible)) |
1389 return TRUE; |
1358 return TRUE; |
1390 |
1359 |
1391 /* check time or bytes */ |
1360 if (sq->cur_time != 0) { |
1392 res = IS_FILLED (time, sq->cur_time) || IS_FILLED (bytes, bytes); |
1361 /* if we have valid time in the queue, check */ |
1393 |
1362 res = IS_FILLED (time, sq->cur_time); |
1363 } else { |
|
1364 /* no valid time, check bytes */ |
|
1365 res = IS_FILLED (bytes, bytes); |
|
1366 } |
|
1394 return res; |
1367 return res; |
1395 } |
1368 } |
1396 |
1369 |
1397 static void |
1370 static void |
1398 gst_single_queue_free (GstSingleQueue * sq) |
1371 gst_single_queue_free (GstSingleQueue * sq) |
1480 GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); |
1453 GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links)); |
1481 |
1454 |
1482 gst_pad_set_element_private (sq->sinkpad, (gpointer) sq); |
1455 gst_pad_set_element_private (sq->sinkpad, (gpointer) sq); |
1483 gst_pad_set_element_private (sq->srcpad, (gpointer) sq); |
1456 gst_pad_set_element_private (sq->srcpad, (gpointer) sq); |
1484 |
1457 |
1485 /* only activate the pads when we are not in the NULL state |
1458 gst_pad_set_active (sq->srcpad, TRUE); |
1486 * and add the pad under the state_lock to prevend state changes |
|
1487 * between activating and adding */ |
|
1488 g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue)); |
|
1489 if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) { |
|
1490 gst_pad_set_active (sq->srcpad, TRUE); |
|
1491 gst_pad_set_active (sq->sinkpad, TRUE); |
|
1492 } |
|
1493 gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); |
1459 gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad); |
1460 |
|
1461 gst_pad_set_active (sq->sinkpad, TRUE); |
|
1494 gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); |
1462 gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad); |
1495 g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue)); |
|
1496 |
1463 |
1497 GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added", |
1464 GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added", |
1498 sq->id); |
1465 sq->id); |
1499 |
1466 |
1500 return sq; |
1467 return sq; |