gstreamer_core/plugins/elements/gstqueue.c
branchRCL_3
changeset 29 567bb019e3e3
parent 0 0e761a78d257
child 30 7e817e7e631c
--- a/gstreamer_core/plugins/elements/gstqueue.c	Wed Mar 31 22:03:18 2010 +0300
+++ b/gstreamer_core/plugins/elements/gstqueue.c	Tue Aug 31 15:30:33 2010 +0300
@@ -24,7 +24,6 @@
 
 /**
  * SECTION:element-queue
- * @short_description: Simple asynchronous data queue.
  *
  * Data is queued until one of the limits specified by the
  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
@@ -55,9 +54,6 @@
  * empty). The #GstQueue::overrun signal is emitted when the queue is filled
  * up. Both signals are emitted from the context of the streaming thread.
  */
-#ifdef __SYMBIAN32__
-#include <gst_global.h>
-#endif
 
 #include "gst/gst_private.h"
 
@@ -65,11 +61,6 @@
 #include "gstqueue.h"
 
 #include "../../gst/gst-i18n-lib.h"
-#ifdef __SYMBIAN32__
-#include <glib_global.h>
-#include <gobject_global.h>
-
-#endif
 
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
@@ -205,6 +196,7 @@
 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
 static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query);
 
+static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps);
 static GstCaps *gst_queue_getcaps (GstPad * pad);
 static GstPadLinkReturn gst_queue_link_sink (GstPad * pad, GstPad * peer);
 static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer);
@@ -212,8 +204,6 @@
 
 static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
 static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
-static GstStateChangeReturn gst_queue_change_state (GstElement * element,
-    GstStateChange transition);
 
 static gboolean gst_queue_is_empty (GstQueue * queue);
 static gboolean gst_queue_is_filled (GstQueue * queue);
@@ -258,7 +248,6 @@
 gst_queue_class_init (GstQueueClass * klass)
 {
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
-  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
 
   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
@@ -318,51 +307,52 @@
   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
           "Current amount of data in the queue (bytes)",
-          0, G_MAXUINT, 0, G_PARAM_READABLE));
+          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
           "Current number of buffers in the queue",
-          0, G_MAXUINT, 0, G_PARAM_READABLE));
+          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
           "Current amount of data in the queue (in ns)",
-          0, G_MAXUINT64, 0, G_PARAM_READABLE));
+          0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
           "Max. amount of data in the queue (bytes, 0=disable)",
-          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
+          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
-          "Max. number of buffers in the queue (0=disable)",
-          0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
+          "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
+          DEFAULT_MAX_SIZE_BUFFERS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
-          "Max. amount of data in the queue (in ns, 0=disable)",
-          0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
+          "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
+          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE));
+          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
           "Min. number of buffers in the queue to allow reading (0=disable)",
-          0, G_MAXUINT, 0, G_PARAM_READWRITE));
+          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
-          0, G_MAXUINT64, 0, G_PARAM_READWRITE));
+          0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, ARG_LEAKY,
       g_param_spec_enum ("leaky", "Leaky",
           "Where the queue leaks, if at all",
-          GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
+          GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /* set several parent class virtual functions */
   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize);
-
-  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
 }
 
 static void
@@ -380,6 +370,8 @@
       GST_DEBUG_FUNCPTR (gst_queue_link_sink));
   gst_pad_set_getcaps_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
+  gst_pad_set_acceptcaps_function (queue->sinkpad,
+      GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
   gst_pad_set_bufferalloc_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue_bufferalloc));
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
@@ -390,6 +382,8 @@
       GST_DEBUG_FUNCPTR (gst_queue_src_activate_push));
   gst_pad_set_link_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_link_src));
+  gst_pad_set_acceptcaps_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
   gst_pad_set_getcaps_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_event_function (queue->srcpad,
@@ -441,6 +435,21 @@
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+static gboolean
+gst_queue_acceptcaps (GstPad * pad, GstCaps * caps)
+{
+  gboolean result;
+  GstQueue *queue;
+  GstPad *otherpad;
+
+  queue = GST_QUEUE (GST_PAD_PARENT (pad));
+
+  otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
+  result = gst_pad_peer_accept_caps (otherpad, caps);
+
+  return result;
+}
+
 static GstCaps *
 gst_queue_getcaps (GstPad * pad)
 {
@@ -801,14 +810,14 @@
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "refusing event, we are flushing");
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (event);
+    gst_event_unref (event);
     return FALSE;
   }
 out_eos:
   {
     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
     GST_QUEUE_MUTEX_UNLOCK (queue);
-    gst_buffer_unref (event);
+    gst_event_unref (event);
     return FALSE;
   }
 }
@@ -816,13 +825,18 @@
 static gboolean
 gst_queue_is_empty (GstQueue * queue)
 {
-  return (queue->queue->length == 0 ||
-      (queue->min_threshold.buffers > 0 &&
+  if (queue->queue->length == 0)
+    return TRUE;
+
+  /* It is possible that a max size is reached before all min thresholds are.
+   * Therefore, only consider it empty if it is not filled. */
+  return ((queue->min_threshold.buffers > 0 &&
           queue->cur_level.buffers < queue->min_threshold.buffers) ||
       (queue->min_threshold.bytes > 0 &&
           queue->cur_level.bytes < queue->min_threshold.bytes) ||
       (queue->min_threshold.time > 0 &&
-          queue->cur_level.time < queue->min_threshold.time));
+          queue->cur_level.time < queue->min_threshold.time)) &&
+      !gst_queue_is_filled (queue);
 }
 
 static gboolean
@@ -836,6 +850,27 @@
               queue->cur_level.time >= queue->max_size.time)));
 }
 
+static void
+gst_queue_leak_downstream (GstQueue * queue)
+{
+  /* for as long as the queue is filled, dequeue an item and discard it */
+  while (gst_queue_is_filled (queue)) {
+    GstMiniObject *leak;
+
+    leak = gst_queue_locked_dequeue (queue);
+    /* there is nothing to dequeue and the queue is still filled.. This should
+     * not happen */
+    g_assert (leak != NULL);
+
+    GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
+        "queue is full, leaking item %p on downstream end", leak);
+    gst_mini_object_unref (leak);
+
+    /* last buffer needs to get a DISCONT flag */
+    queue->head_needs_discont = TRUE;
+  }
+}
+
 static GstFlowReturn
 gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 {
@@ -883,25 +918,8 @@
         /* now we can clean up and exit right away */
         goto out_unref;
       case GST_QUEUE_LEAK_DOWNSTREAM:
-      {
-        /* for as long as the queue is filled, dequeue an item and discard 
-         * it. */
-        do {
-          GstMiniObject *leak;
-
-          leak = gst_queue_locked_dequeue (queue);
-          /* there is nothing to dequeue and the queue is still filled.. This
-           * should not happen. */
-          g_assert (leak != NULL);
-
-          GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
-              "queue is full, leaking item %p on downstream end", leak);
-          gst_buffer_unref (leak);
-        } while (gst_queue_is_filled (queue));
-        /* last buffer needs to get a DISCONT flag */
-        queue->head_needs_discont = TRUE;
+        gst_queue_leak_downstream (queue);
         break;
-      }
       default:
         g_warning ("Unknown leaky type, using default");
         /* fall-through */
@@ -1198,9 +1216,9 @@
           peer_pos -= queue->cur_level.time;
           break;
         default:
-          GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
+          GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
               "know how to adjust value", gst_format_get_name (format));
-          return FALSE;
+          return TRUE;
       }
       /* set updated position */
       gst_query_set_position (query, format, peer_pos);
@@ -1222,6 +1240,10 @@
       else
         max = -1;
 
+      /* adjust for min-threshold */
+      if (queue->min_threshold.time > 0 && min != -1)
+        min += queue->min_threshold.time;
+
       gst_query_set_latency (query, live, min, max);
       break;
     }
@@ -1298,47 +1320,19 @@
   return result;
 }
 
-static GstStateChangeReturn
-gst_queue_change_state (GstElement * element, GstStateChange transition)
+static void
+queue_capacity_change (GstQueue * queue)
 {
-  GstQueue *queue;
-  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
-
-  queue = GST_QUEUE (element);
-
-  switch (transition) {
-    case GST_STATE_CHANGE_NULL_TO_READY:
-      break;
-    case GST_STATE_CHANGE_READY_TO_PAUSED:
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
-      break;
-    default:
-      break;
+  if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) {
+    gst_queue_leak_downstream (queue);
   }
 
-  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-
-  switch (transition) {
-    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
-      break;
-    case GST_STATE_CHANGE_PAUSED_TO_READY:
-      break;
-    case GST_STATE_CHANGE_READY_TO_NULL:
-      break;
-    default:
-      break;
-  }
-
-  return ret;
+  /* changing the capacity of the queue must wake up
+   * the _chain function, it might have more room now
+   * to store the buffer/event in the queue */
+  GST_QUEUE_SIGNAL_DEL (queue);
 }
 
-/* changing the capacity of the queue must wake up
- * the _chain function, it might have more room now
- * to store the buffer/event in the queue */
-#define QUEUE_CAPACITY_CHANGE(q)\
-  GST_QUEUE_SIGNAL_DEL (q);
-
 /* Changing the minimum required fill level must
  * wake up the _loop function as it might now
  * be able to preceed.
@@ -1359,15 +1353,15 @@
   switch (prop_id) {
     case ARG_MAX_SIZE_BYTES:
       queue->max_size.bytes = g_value_get_uint (value);
-      QUEUE_CAPACITY_CHANGE (queue);
+      queue_capacity_change (queue);
       break;
     case ARG_MAX_SIZE_BUFFERS:
       queue->max_size.buffers = g_value_get_uint (value);
-      QUEUE_CAPACITY_CHANGE (queue);
+      queue_capacity_change (queue);
       break;
     case ARG_MAX_SIZE_TIME:
       queue->max_size.time = g_value_get_uint64 (value);
-      QUEUE_CAPACITY_CHANGE (queue);
+      queue_capacity_change (queue);
       break;
     case ARG_MIN_THRESHOLD_BYTES:
       queue->min_threshold.bytes = g_value_get_uint (value);