--- a/gstreamer_core/plugins/elements/gstmultiqueue.c Tue Aug 31 15:30:33 2010 +0300
+++ b/gstreamer_core/plugins/elements/gstmultiqueue.c Wed Sep 01 12:16:41 2010 +0100
@@ -23,6 +23,7 @@
/**
* SECTION:element-multiqueue
+ * @short_description: Asynchronous data queues
* @see_also: #GstQueue
*
* <refsect2>
@@ -34,12 +35,12 @@
* <itemizedlist><title>Multiple streamhandling</title>
* <listitem><para>
* The element handles queueing data on more than one stream at once. To
- * achieve such a feature it has request sink pads (sink%d) and
- * 'sometimes' src pads (src%d).
+ * achieve such a feature it has request sink pads (sink_%d) and
+ * 'sometimes' src pads (src_%d).
* </para><para>
* When requesting a given sinkpad with gst_element_get_request_pad(),
* the associated srcpad for that stream will be created.
- * Example: requesting sink1 will generate src1.
+ * Ex: requesting sink_1 will generate src_1.
* </para></listitem>
* </itemizedlist>
* </listitem>
@@ -111,6 +112,11 @@
#include <gst/gst.h>
#include "gstmultiqueue.h"
+#ifdef __SYMBIAN32__
+#include <glib_global.h>
+#include <gobject_global.h>
+#endif
+
/**
* GstSingleQueue:
* @sinkpad: associated sink #GstPad
@@ -169,7 +175,6 @@
static void wake_up_next_non_linked (GstMultiQueue * mq);
static void compute_high_id (GstMultiQueue * mq);
-static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
GST_PAD_SINK,
@@ -272,32 +277,11 @@
GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
/* SIGNALS */
-
- /**
- * GstMultiQueue::underrun:
- * @multiqueue: the multqueue instance
- *
- * This signal is emitted from the streaming thread when there is
- * no data in any of the queues inside the multiqueue instance (underrun).
- *
- * This indicates either starvation or EOS from the upstream data sources.
- */
gst_multi_queue_signals[SIGNAL_UNDERRUN] =
g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
- /**
- * GstMultiQueue::overrun:
- * @multiqueue: the multiqueue instance
- *
- * Reports that one of the queues in the multiqueue is full (overrun).
- * A queue is full if the total amount of data inside it (num-buffers, time,
- * size) is higher than the boundary values which can be set through the
- * GObject properties.
- *
- * This can be used as an indicator of pre-roll.
- */
gst_multi_queue_signals[SIGNAL_OVERRUN] =
g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
@@ -308,33 +292,28 @@
g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
"Max. amount of data in the queue (bytes, 0=disable)",
- 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
- "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
- DEFAULT_MAX_SIZE_BUFFERS,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ "Max. number of buffers in the queue (0=disable)",
+ 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
- "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
- DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ "Max. amount of data in the queue (in ns, 0=disable)",
+ 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
"Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
- 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
"Amount of buffers the queues can grow if one of them is empty (0=disable)",
- 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
"Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
- 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
@@ -626,28 +605,20 @@
sink_time =
gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
sq->sink_segment.last_stop);
- if (sink_time == GST_CLOCK_TIME_NONE)
- goto beach;
src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
sq->src_segment.last_stop);
- if (src_time == GST_CLOCK_TIME_NONE)
- goto beach;
GST_DEBUG_OBJECT (mq,
"queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
- /* This allows for streams with out of order timestamping - sometimes the
+ /* This allows for streams with out of order timestamping - sometimes the
* emerging timestamp is later than the arriving one(s) */
- if (sink_time < src_time)
- goto beach;
-
- sq->cur_time = sink_time - src_time;
- return;
-
-beach:
- sq->cur_time = 0;
+ if (sink_time >= src_time)
+ sq->cur_time = sink_time - src_time;
+ else
+ sq->cur_time = 0;
}
/* take a NEWSEGMENT event and apply the values to segment, updating the time
@@ -796,7 +767,7 @@
{
if (item->object)
gst_mini_object_unref (item->object);
- g_slice_free (GstMultiQueueItem, item);
+ g_free (item);
}
/* takes ownership of passed mini object! */
@@ -805,7 +776,7 @@
{
GstMultiQueueItem *item;
- item = g_slice_new (GstMultiQueueItem);
+ item = g_new (GstMultiQueueItem, 1);
item->object = object;
item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
item->posid = curid;
@@ -1091,7 +1062,6 @@
switch (type) {
case GST_EVENT_EOS:
sq->is_eos = TRUE;
- single_queue_overrun_cb (sq->queue, sq);
break;
case GST_EVENT_NEWSEGMENT:
apply_segment (mq, sq, sref, &sq->sink_segment);
@@ -1297,7 +1267,7 @@
if (gst_data_queue_is_empty (ssq->queue)) {
GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
if (IS_FILLED (visible, size.visible)) {
- sq->max_size.visible = size.visible + 1;
+ sq->max_size.visible++;
GST_DEBUG_OBJECT (mq,
"Another queue is empty, bumping single queue %d max visible to %d",
sq->id, sq->max_size.visible);
@@ -1314,8 +1284,7 @@
ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
/* if this queue is filled completely we must signal overrun */
- if (sq->is_eos || IS_FILLED (bytes, ssize.bytes) ||
- IS_FILLED (time, sq->cur_time)) {
+ if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
filled = TRUE;
}
@@ -1352,7 +1321,7 @@
gst_data_queue_get_level (sq->queue, &size);
if (IS_FILLED (visible, size.visible)) {
- sq->max_size.visible = size.visible + 1;
+ sq->max_size.visible++;
GST_DEBUG_OBJECT (mq,
"queue %d is filled, bumping its max visible to %d", sq->id,
sq->max_size.visible);
@@ -1388,9 +1357,13 @@
if (IS_FILLED (visible, visible))
return TRUE;
- /* check time or bytes */
- res = IS_FILLED (time, sq->cur_time) || IS_FILLED (bytes, bytes);
-
+ if (sq->cur_time != 0) {
+ /* if we have valid time in the queue, check */
+ res = IS_FILLED (time, sq->cur_time);
+ } else {
+ /* no valid time, check bytes */
+ res = IS_FILLED (bytes, bytes);
+ }
return res;
}
@@ -1482,17 +1455,11 @@
gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
- /* only activate the pads when we are not in the NULL state
- * and add the pad under the state_lock to prevend state changes
- * between activating and adding */
- g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
- if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
- gst_pad_set_active (sq->srcpad, TRUE);
- gst_pad_set_active (sq->sinkpad, TRUE);
- }
+ gst_pad_set_active (sq->srcpad, TRUE);
gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
+
+ gst_pad_set_active (sq->sinkpad, TRUE);
gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
- g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
sq->id);