gstreamer_core/plugins/elements/gstmultiqueue.c
branchRCL_3
changeset 30 7e817e7e631c
parent 29 567bb019e3e3
--- a/gstreamer_core/plugins/elements/gstmultiqueue.c	Tue Aug 31 15:30:33 2010 +0300
+++ b/gstreamer_core/plugins/elements/gstmultiqueue.c	Wed Sep 01 12:16:41 2010 +0100
@@ -23,6 +23,7 @@
 
 /**
  * SECTION:element-multiqueue
+ * @short_description: Asynchronous data queues
  * @see_also: #GstQueue
  *
  * <refsect2>
@@ -34,12 +35,12 @@
  *   <itemizedlist><title>Multiple streamhandling</title>
  *   <listitem><para>
  *     The element handles queueing data on more than one stream at once. To
- *     achieve such a feature it has request sink pads (sink%d) and
- *     'sometimes' src pads (src%d).
+ *     achieve such a feature it has request sink pads (sink_%d) and
+ *     'sometimes' src pads (src_%d).
  *   </para><para>
  *     When requesting a given sinkpad with gst_element_get_request_pad(),
  *     the associated srcpad for that stream will be created.
- *     Example: requesting sink1 will generate src1.
+ *     Ex: requesting sink_1 will generate src_1.
  *   </para></listitem>
  *   </itemizedlist>
  * </listitem>
@@ -111,6 +112,11 @@
 #include <gst/gst.h>
 #include "gstmultiqueue.h"
 
+#ifdef __SYMBIAN32__
+#include <glib_global.h>
+#include <gobject_global.h>
+#endif
+
 /**
  * GstSingleQueue:
  * @sinkpad: associated sink #GstPad
@@ -169,7 +175,6 @@
 
 static void wake_up_next_non_linked (GstMultiQueue * mq);
 static void compute_high_id (GstMultiQueue * mq);
-static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
 
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
     GST_PAD_SINK,
@@ -272,32 +277,11 @@
       GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
 
   /* SIGNALS */
-
-  /**
-   * GstMultiQueue::underrun:
-   * @multiqueue: the multqueue instance
-   *
-   * This signal is emitted from the streaming thread when there is
-   * no data in any of the queues inside the multiqueue instance (underrun).
-   *
-   * This indicates either starvation or EOS from the upstream data sources.
-   */
   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
 
-  /**
-   * GstMultiQueue::overrun:
-   * @multiqueue: the multiqueue instance
-   *
-   * Reports that one of the queues in the multiqueue is full (overrun).
-   * A queue is full if the total amount of data inside it (num-buffers, time,
-   * size) is higher than the boundary values which can be set through the
-   * GObject properties.
-   *
-   * This can be used as an indicator of pre-roll. 
-   */
   gst_multi_queue_signals[SIGNAL_OVERRUN] =
       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
@@ -308,33 +292,28 @@
   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
           "Max. amount of data in the queue (bytes, 0=disable)",
-          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
-          "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
-          DEFAULT_MAX_SIZE_BUFFERS,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Max. number of buffers in the queue (0=disable)",
+          0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
-          "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
-          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Max. amount of data in the queue (in ns, 0=disable)",
+          0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
 
   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
-          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
           "Amount of buffers the queues can grow if one of them is empty (0=disable)",
-          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
-          0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
 
   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
 
@@ -626,28 +605,20 @@
   sink_time =
       gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
       sq->sink_segment.last_stop);
-  if (sink_time == GST_CLOCK_TIME_NONE)
-    goto beach;
 
   src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
       sq->src_segment.last_stop);
-  if (src_time == GST_CLOCK_TIME_NONE)
-    goto beach;
 
   GST_DEBUG_OBJECT (mq,
       "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
 
-  /* This allows for streams with out of order timestamping - sometimes the
+  /* This allows for streams with out of order timestamping - sometimes the 
    * emerging timestamp is later than the arriving one(s) */
-  if (sink_time < src_time)
-    goto beach;
-
-  sq->cur_time = sink_time - src_time;
-  return;
-
-beach:
-  sq->cur_time = 0;
+  if (sink_time >= src_time)
+    sq->cur_time = sink_time - src_time;
+  else
+    sq->cur_time = 0;
 }
 
 /* take a NEWSEGMENT event and apply the values to segment, updating the time
@@ -796,7 +767,7 @@
 {
   if (item->object)
     gst_mini_object_unref (item->object);
-  g_slice_free (GstMultiQueueItem, item);
+  g_free (item);
 }
 
 /* takes ownership of passed mini object! */
@@ -805,7 +776,7 @@
 {
   GstMultiQueueItem *item;
 
-  item = g_slice_new (GstMultiQueueItem);
+  item = g_new (GstMultiQueueItem, 1);
   item->object = object;
   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
   item->posid = curid;
@@ -1091,7 +1062,6 @@
   switch (type) {
     case GST_EVENT_EOS:
       sq->is_eos = TRUE;
-      single_queue_overrun_cb (sq->queue, sq);
       break;
     case GST_EVENT_NEWSEGMENT:
       apply_segment (mq, sq, sref, &sq->sink_segment);
@@ -1297,7 +1267,7 @@
     if (gst_data_queue_is_empty (ssq->queue)) {
       GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
       if (IS_FILLED (visible, size.visible)) {
-        sq->max_size.visible = size.visible + 1;
+        sq->max_size.visible++;
         GST_DEBUG_OBJECT (mq,
             "Another queue is empty, bumping single queue %d max visible to %d",
             sq->id, sq->max_size.visible);
@@ -1314,8 +1284,7 @@
         ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
 
     /* if this queue is filled completely we must signal overrun */
-    if (sq->is_eos || IS_FILLED (bytes, ssize.bytes) ||
-        IS_FILLED (time, sq->cur_time)) {
+    if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
       GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
       filled = TRUE;
     }
@@ -1352,7 +1321,7 @@
 
       gst_data_queue_get_level (sq->queue, &size);
       if (IS_FILLED (visible, size.visible)) {
-        sq->max_size.visible = size.visible + 1;
+        sq->max_size.visible++;
         GST_DEBUG_OBJECT (mq,
             "queue %d is filled, bumping its max visible to %d", sq->id,
             sq->max_size.visible);
@@ -1388,9 +1357,13 @@
   if (IS_FILLED (visible, visible))
     return TRUE;
 
-  /* check time or bytes */
-  res = IS_FILLED (time, sq->cur_time) || IS_FILLED (bytes, bytes);
-
+  if (sq->cur_time != 0) {
+    /* if we have valid time in the queue, check */
+    res = IS_FILLED (time, sq->cur_time);
+  } else {
+    /* no valid time, check bytes */
+    res = IS_FILLED (bytes, bytes);
+  }
   return res;
 }
 
@@ -1482,17 +1455,11 @@
   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
 
-  /* only activate the pads when we are not in the NULL state
-   * and add the pad under the state_lock to prevend state changes
-   * between activating and adding */
-  g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
-  if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
-    gst_pad_set_active (sq->srcpad, TRUE);
-    gst_pad_set_active (sq->sinkpad, TRUE);
-  }
+  gst_pad_set_active (sq->srcpad, TRUE);
   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
+
+  gst_pad_set_active (sq->sinkpad, TRUE);
   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
-  g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
 
   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
       sq->id);