gstreamer_core/plugins/elements/gstqueue.c
branchRCL_3
changeset 30 7e817e7e631c
parent 29 567bb019e3e3
equal deleted inserted replaced
29:567bb019e3e3 30:7e817e7e631c
    22  * Boston, MA 02111-1307, USA.
    22  * Boston, MA 02111-1307, USA.
    23  */
    23  */
    24 
    24 
    25 /**
    25 /**
    26  * SECTION:element-queue
    26  * SECTION:element-queue
       
    27  * @short_description: Simple asynchronous data queue.
    27  *
    28  *
    28  * Data is queued until one of the limits specified by the
    29  * Data is queued until one of the limits specified by the
    29  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
    30  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
    30  * #GstQueue:max-size-time properties has been reached. Any attempt to push
    31  * #GstQueue:max-size-time properties has been reached. Any attempt to push
    31  * more buffers into the queue will block the pushing thread until more space
    32  * more buffers into the queue will block the pushing thread until more space
    52  * The #GstQueue::underrun signal is emitted when the queue has less data than
    53  * The #GstQueue::underrun signal is emitted when the queue has less data than
    53  * the specified minimum thresholds require (by default: when the queue is
    54  * the specified minimum thresholds require (by default: when the queue is
    54  * empty). The #GstQueue::overrun signal is emitted when the queue is filled
    55  * empty). The #GstQueue::overrun signal is emitted when the queue is filled
    55  * up. Both signals are emitted from the context of the streaming thread.
    56  * up. Both signals are emitted from the context of the streaming thread.
    56  */
    57  */
       
    58 #ifdef __SYMBIAN32__
       
    59 #include <gst_global.h>
       
    60 #endif
    57 
    61 
    58 #include "gst/gst_private.h"
    62 #include "gst/gst_private.h"
    59 
    63 
    60 #include <gst/gst.h>
    64 #include <gst/gst.h>
    61 #include "gstqueue.h"
    65 #include "gstqueue.h"
    62 
    66 
    63 #include "../../gst/gst-i18n-lib.h"
    67 #include "../../gst/gst-i18n-lib.h"
       
    68 #ifdef __SYMBIAN32__
       
    69 #include <glib_global.h>
       
    70 #include <gobject_global.h>
       
    71 
       
    72 #endif
    64 
    73 
    65 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    74 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    66     GST_PAD_SINK,
    75     GST_PAD_SINK,
    67     GST_PAD_ALWAYS,
    76     GST_PAD_ALWAYS,
    68     GST_STATIC_CAPS_ANY);
    77     GST_STATIC_CAPS_ANY);
   194 static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);
   203 static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);
   195 
   204 
   196 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
   205 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
   197 static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query);
   206 static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query);
   198 
   207 
   199 static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps);
       
   200 static GstCaps *gst_queue_getcaps (GstPad * pad);
   208 static GstCaps *gst_queue_getcaps (GstPad * pad);
   201 static GstPadLinkReturn gst_queue_link_sink (GstPad * pad, GstPad * peer);
   209 static GstPadLinkReturn gst_queue_link_sink (GstPad * pad, GstPad * peer);
   202 static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer);
   210 static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer);
   203 static void gst_queue_locked_flush (GstQueue * queue);
   211 static void gst_queue_locked_flush (GstQueue * queue);
   204 
   212 
   205 static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
   213 static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
   206 static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
   214 static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
       
   215 static GstStateChangeReturn gst_queue_change_state (GstElement * element,
       
   216     GstStateChange transition);
   207 
   217 
   208 static gboolean gst_queue_is_empty (GstQueue * queue);
   218 static gboolean gst_queue_is_empty (GstQueue * queue);
   209 static gboolean gst_queue_is_filled (GstQueue * queue);
   219 static gboolean gst_queue_is_filled (GstQueue * queue);
   210 
   220 
   211 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
   221 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
   246 
   256 
   247 static void
   257 static void
   248 gst_queue_class_init (GstQueueClass * klass)
   258 gst_queue_class_init (GstQueueClass * klass)
   249 {
   259 {
   250   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   260   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
       
   261   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
   251 
   262 
   252   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
   263   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
   253   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
   264   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
   254 
   265 
   255   /* signals */
   266   /* signals */
   305 
   316 
   306   /* properties */
   317   /* properties */
   307   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
   318   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES,
   308       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
   319       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
   309           "Current amount of data in the queue (bytes)",
   320           "Current amount of data in the queue (bytes)",
   310           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   321           0, G_MAXUINT, 0, G_PARAM_READABLE));
   311   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
   322   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BUFFERS,
   312       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
   323       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
   313           "Current number of buffers in the queue",
   324           "Current number of buffers in the queue",
   314           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   325           0, G_MAXUINT, 0, G_PARAM_READABLE));
   315   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
   326   g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME,
   316       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
   327       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
   317           "Current amount of data in the queue (in ns)",
   328           "Current amount of data in the queue (in ns)",
   318           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   329           0, G_MAXUINT64, 0, G_PARAM_READABLE));
   319 
   330 
   320   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
   331   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
   321       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   332       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   322           "Max. amount of data in the queue (bytes, 0=disable)",
   333           "Max. amount of data in the queue (bytes, 0=disable)",
   323           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
   334           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
   324           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   325   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
   335   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
   326       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   336       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   327           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
   337           "Max. number of buffers in the queue (0=disable)",
   328           DEFAULT_MAX_SIZE_BUFFERS,
   338           0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
   329           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   330   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
   339   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
   331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   340       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   332           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
   341           "Max. amount of data in the queue (in ns, 0=disable)",
   333           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   342           0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
   334 
   343 
   335   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
   344   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BYTES,
   336       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
   345       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
   337           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
   346           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
   338           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   347           0, G_MAXUINT, 0, G_PARAM_READWRITE));
   339   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
   348   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_BUFFERS,
   340       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
   349       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
   341           "Min. number of buffers in the queue to allow reading (0=disable)",
   350           "Min. number of buffers in the queue to allow reading (0=disable)",
   342           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   351           0, G_MAXUINT, 0, G_PARAM_READWRITE));
   343   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
   352   g_object_class_install_property (gobject_class, ARG_MIN_THRESHOLD_TIME,
   344       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
   353       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
   345           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
   354           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
   346           0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   355           0, G_MAXUINT64, 0, G_PARAM_READWRITE));
   347 
   356 
   348   g_object_class_install_property (gobject_class, ARG_LEAKY,
   357   g_object_class_install_property (gobject_class, ARG_LEAKY,
   349       g_param_spec_enum ("leaky", "Leaky",
   358       g_param_spec_enum ("leaky", "Leaky",
   350           "Where the queue leaks, if at all",
   359           "Where the queue leaks, if at all",
   351           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
   360           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
   352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   353 
   361 
   354   /* set several parent class virtual functions */
   362   /* set several parent class virtual functions */
   355   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize);
   363   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize);
       
   364 
       
   365   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state);
   356 }
   366 }
   357 
   367 
   358 static void
   368 static void
   359 gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
   369 gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
   360 {
   370 {
   368       GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
   378       GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
   369   gst_pad_set_link_function (queue->sinkpad,
   379   gst_pad_set_link_function (queue->sinkpad,
   370       GST_DEBUG_FUNCPTR (gst_queue_link_sink));
   380       GST_DEBUG_FUNCPTR (gst_queue_link_sink));
   371   gst_pad_set_getcaps_function (queue->sinkpad,
   381   gst_pad_set_getcaps_function (queue->sinkpad,
   372       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   382       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   373   gst_pad_set_acceptcaps_function (queue->sinkpad,
       
   374       GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
       
   375   gst_pad_set_bufferalloc_function (queue->sinkpad,
   383   gst_pad_set_bufferalloc_function (queue->sinkpad,
   376       GST_DEBUG_FUNCPTR (gst_queue_bufferalloc));
   384       GST_DEBUG_FUNCPTR (gst_queue_bufferalloc));
   377   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
   385   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
   378 
   386 
   379   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
   387   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
   380 
   388 
   381   gst_pad_set_activatepush_function (queue->srcpad,
   389   gst_pad_set_activatepush_function (queue->srcpad,
   382       GST_DEBUG_FUNCPTR (gst_queue_src_activate_push));
   390       GST_DEBUG_FUNCPTR (gst_queue_src_activate_push));
   383   gst_pad_set_link_function (queue->srcpad,
   391   gst_pad_set_link_function (queue->srcpad,
   384       GST_DEBUG_FUNCPTR (gst_queue_link_src));
   392       GST_DEBUG_FUNCPTR (gst_queue_link_src));
   385   gst_pad_set_acceptcaps_function (queue->srcpad,
       
   386       GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
       
   387   gst_pad_set_getcaps_function (queue->srcpad,
   393   gst_pad_set_getcaps_function (queue->srcpad,
   388       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   394       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   389   gst_pad_set_event_function (queue->srcpad,
   395   gst_pad_set_event_function (queue->srcpad,
   390       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
   396       GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
   391   gst_pad_set_query_function (queue->srcpad,
   397   gst_pad_set_query_function (queue->srcpad,
   431   g_mutex_free (queue->qlock);
   437   g_mutex_free (queue->qlock);
   432   g_cond_free (queue->item_add);
   438   g_cond_free (queue->item_add);
   433   g_cond_free (queue->item_del);
   439   g_cond_free (queue->item_del);
   434 
   440 
   435   G_OBJECT_CLASS (parent_class)->finalize (object);
   441   G_OBJECT_CLASS (parent_class)->finalize (object);
   436 }
       
   437 
       
   438 static gboolean
       
   439 gst_queue_acceptcaps (GstPad * pad, GstCaps * caps)
       
   440 {
       
   441   gboolean result;
       
   442   GstQueue *queue;
       
   443   GstPad *otherpad;
       
   444 
       
   445   queue = GST_QUEUE (GST_PAD_PARENT (pad));
       
   446 
       
   447   otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
       
   448   result = gst_pad_peer_accept_caps (otherpad, caps);
       
   449 
       
   450   return result;
       
   451 }
   442 }
   452 
   443 
   453 static GstCaps *
   444 static GstCaps *
   454 gst_queue_getcaps (GstPad * pad)
   445 gst_queue_getcaps (GstPad * pad)
   455 {
   446 {
   808 out_flushing:
   799 out_flushing:
   809   {
   800   {
   810     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
   801     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
   811         "refusing event, we are flushing");
   802         "refusing event, we are flushing");
   812     GST_QUEUE_MUTEX_UNLOCK (queue);
   803     GST_QUEUE_MUTEX_UNLOCK (queue);
   813     gst_event_unref (event);
   804     gst_buffer_unref (event);
   814     return FALSE;
   805     return FALSE;
   815   }
   806   }
   816 out_eos:
   807 out_eos:
   817   {
   808   {
   818     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
   809     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
   819     GST_QUEUE_MUTEX_UNLOCK (queue);
   810     GST_QUEUE_MUTEX_UNLOCK (queue);
   820     gst_event_unref (event);
   811     gst_buffer_unref (event);
   821     return FALSE;
   812     return FALSE;
   822   }
   813   }
   823 }
   814 }
   824 
   815 
   825 static gboolean
   816 static gboolean
   826 gst_queue_is_empty (GstQueue * queue)
   817 gst_queue_is_empty (GstQueue * queue)
   827 {
   818 {
   828   if (queue->queue->length == 0)
   819   return (queue->queue->length == 0 ||
   829     return TRUE;
   820       (queue->min_threshold.buffers > 0 &&
   830 
       
   831   /* It is possible that a max size is reached before all min thresholds are.
       
   832    * Therefore, only consider it empty if it is not filled. */
       
   833   return ((queue->min_threshold.buffers > 0 &&
       
   834           queue->cur_level.buffers < queue->min_threshold.buffers) ||
   821           queue->cur_level.buffers < queue->min_threshold.buffers) ||
   835       (queue->min_threshold.bytes > 0 &&
   822       (queue->min_threshold.bytes > 0 &&
   836           queue->cur_level.bytes < queue->min_threshold.bytes) ||
   823           queue->cur_level.bytes < queue->min_threshold.bytes) ||
   837       (queue->min_threshold.time > 0 &&
   824       (queue->min_threshold.time > 0 &&
   838           queue->cur_level.time < queue->min_threshold.time)) &&
   825           queue->cur_level.time < queue->min_threshold.time));
   839       !gst_queue_is_filled (queue);
       
   840 }
   826 }
   841 
   827 
   842 static gboolean
   828 static gboolean
   843 gst_queue_is_filled (GstQueue * queue)
   829 gst_queue_is_filled (GstQueue * queue)
   844 {
   830 {
   846               queue->cur_level.buffers >= queue->max_size.buffers) ||
   832               queue->cur_level.buffers >= queue->max_size.buffers) ||
   847           (queue->max_size.bytes > 0 &&
   833           (queue->max_size.bytes > 0 &&
   848               queue->cur_level.bytes >= queue->max_size.bytes) ||
   834               queue->cur_level.bytes >= queue->max_size.bytes) ||
   849           (queue->max_size.time > 0 &&
   835           (queue->max_size.time > 0 &&
   850               queue->cur_level.time >= queue->max_size.time)));
   836               queue->cur_level.time >= queue->max_size.time)));
   851 }
       
   852 
       
   853 static void
       
   854 gst_queue_leak_downstream (GstQueue * queue)
       
   855 {
       
   856   /* for as long as the queue is filled, dequeue an item and discard it */
       
   857   while (gst_queue_is_filled (queue)) {
       
   858     GstMiniObject *leak;
       
   859 
       
   860     leak = gst_queue_locked_dequeue (queue);
       
   861     /* there is nothing to dequeue and the queue is still filled.. This should
       
   862      * not happen */
       
   863     g_assert (leak != NULL);
       
   864 
       
   865     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
       
   866         "queue is full, leaking item %p on downstream end", leak);
       
   867     gst_mini_object_unref (leak);
       
   868 
       
   869     /* last buffer needs to get a DISCONT flag */
       
   870     queue->head_needs_discont = TRUE;
       
   871   }
       
   872 }
   837 }
   873 
   838 
   874 static GstFlowReturn
   839 static GstFlowReturn
   875 gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   840 gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   876 {
   841 {
   916         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
   881         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
   917             "queue is full, leaking buffer on upstream end");
   882             "queue is full, leaking buffer on upstream end");
   918         /* now we can clean up and exit right away */
   883         /* now we can clean up and exit right away */
   919         goto out_unref;
   884         goto out_unref;
   920       case GST_QUEUE_LEAK_DOWNSTREAM:
   885       case GST_QUEUE_LEAK_DOWNSTREAM:
   921         gst_queue_leak_downstream (queue);
   886       {
       
   887         /* for as long as the queue is filled, dequeue an item and discard 
       
   888          * it. */
       
   889         do {
       
   890           GstMiniObject *leak;
       
   891 
       
   892           leak = gst_queue_locked_dequeue (queue);
       
   893           /* there is nothing to dequeue and the queue is still filled.. This
       
   894            * should not happen. */
       
   895           g_assert (leak != NULL);
       
   896 
       
   897           GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
       
   898               "queue is full, leaking item %p on downstream end", leak);
       
   899           gst_buffer_unref (leak);
       
   900         } while (gst_queue_is_filled (queue));
       
   901         /* last buffer needs to get a DISCONT flag */
       
   902         queue->head_needs_discont = TRUE;
   922         break;
   903         break;
       
   904       }
   923       default:
   905       default:
   924         g_warning ("Unknown leaky type, using default");
   906         g_warning ("Unknown leaky type, using default");
   925         /* fall-through */
   907         /* fall-through */
   926       case GST_QUEUE_NO_LEAK:
   908       case GST_QUEUE_NO_LEAK:
   927       {
   909       {
  1214           break;
  1196           break;
  1215         case GST_FORMAT_TIME:
  1197         case GST_FORMAT_TIME:
  1216           peer_pos -= queue->cur_level.time;
  1198           peer_pos -= queue->cur_level.time;
  1217           break;
  1199           break;
  1218         default:
  1200         default:
  1219           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
  1201           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
  1220               "know how to adjust value", gst_format_get_name (format));
  1202               "know how to adjust value", gst_format_get_name (format));
  1221           return TRUE;
  1203           return FALSE;
  1222       }
  1204       }
  1223       /* set updated position */
  1205       /* set updated position */
  1224       gst_query_set_position (query, format, peer_pos);
  1206       gst_query_set_position (query, format, peer_pos);
  1225       break;
  1207       break;
  1226     }
  1208     }
  1237        * possible right now. */
  1219        * possible right now. */
  1238       if (queue->max_size.time > 0 && max != -1)
  1220       if (queue->max_size.time > 0 && max != -1)
  1239         max += queue->max_size.time;
  1221         max += queue->max_size.time;
  1240       else
  1222       else
  1241         max = -1;
  1223         max = -1;
  1242 
       
  1243       /* adjust for min-threshold */
       
  1244       if (queue->min_threshold.time > 0 && min != -1)
       
  1245         min += queue->min_threshold.time;
       
  1246 
  1224 
  1247       gst_query_set_latency (query, live, min, max);
  1225       gst_query_set_latency (query, live, min, max);
  1248       break;
  1226       break;
  1249     }
  1227     }
  1250     default:
  1228     default:
  1318   gst_object_unref (queue);
  1296   gst_object_unref (queue);
  1319 
  1297 
  1320   return result;
  1298   return result;
  1321 }
  1299 }
  1322 
  1300 
  1323 static void
  1301 static GstStateChangeReturn
  1324 queue_capacity_change (GstQueue * queue)
  1302 gst_queue_change_state (GstElement * element, GstStateChange transition)
  1325 {
  1303 {
  1326   if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) {
  1304   GstQueue *queue;
  1327     gst_queue_leak_downstream (queue);
  1305   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
  1328   }
  1306 
  1329 
  1307   queue = GST_QUEUE (element);
  1330   /* changing the capacity of the queue must wake up
  1308 
  1331    * the _chain function, it might have more room now
  1309   switch (transition) {
  1332    * to store the buffer/event in the queue */
  1310     case GST_STATE_CHANGE_NULL_TO_READY:
  1333   GST_QUEUE_SIGNAL_DEL (queue);
  1311       break;
  1334 }
  1312     case GST_STATE_CHANGE_READY_TO_PAUSED:
       
  1313       break;
       
  1314     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       
  1315       break;
       
  1316     default:
       
  1317       break;
       
  1318   }
       
  1319 
       
  1320   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
       
  1321 
       
  1322   switch (transition) {
       
  1323     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
       
  1324       break;
       
  1325     case GST_STATE_CHANGE_PAUSED_TO_READY:
       
  1326       break;
       
  1327     case GST_STATE_CHANGE_READY_TO_NULL:
       
  1328       break;
       
  1329     default:
       
  1330       break;
       
  1331   }
       
  1332 
       
  1333   return ret;
       
  1334 }
       
  1335 
       
  1336 /* changing the capacity of the queue must wake up
       
  1337  * the _chain function, it might have more room now
       
  1338  * to store the buffer/event in the queue */
       
  1339 #define QUEUE_CAPACITY_CHANGE(q)\
       
  1340   GST_QUEUE_SIGNAL_DEL (q);
  1335 
  1341 
  1336 /* Changing the minimum required fill level must
  1342 /* Changing the minimum required fill level must
  1337  * wake up the _loop function as it might now
  1343  * wake up the _loop function as it might now
  1338  * be able to preceed.
  1344  * be able to preceed.
  1339  */
  1345  */
  1351   GST_QUEUE_MUTEX_LOCK (queue);
  1357   GST_QUEUE_MUTEX_LOCK (queue);
  1352 
  1358 
  1353   switch (prop_id) {
  1359   switch (prop_id) {
  1354     case ARG_MAX_SIZE_BYTES:
  1360     case ARG_MAX_SIZE_BYTES:
  1355       queue->max_size.bytes = g_value_get_uint (value);
  1361       queue->max_size.bytes = g_value_get_uint (value);
  1356       queue_capacity_change (queue);
  1362       QUEUE_CAPACITY_CHANGE (queue);
  1357       break;
  1363       break;
  1358     case ARG_MAX_SIZE_BUFFERS:
  1364     case ARG_MAX_SIZE_BUFFERS:
  1359       queue->max_size.buffers = g_value_get_uint (value);
  1365       queue->max_size.buffers = g_value_get_uint (value);
  1360       queue_capacity_change (queue);
  1366       QUEUE_CAPACITY_CHANGE (queue);
  1361       break;
  1367       break;
  1362     case ARG_MAX_SIZE_TIME:
  1368     case ARG_MAX_SIZE_TIME:
  1363       queue->max_size.time = g_value_get_uint64 (value);
  1369       queue->max_size.time = g_value_get_uint64 (value);
  1364       queue_capacity_change (queue);
  1370       QUEUE_CAPACITY_CHANGE (queue);
  1365       break;
  1371       break;
  1366     case ARG_MIN_THRESHOLD_BYTES:
  1372     case ARG_MIN_THRESHOLD_BYTES:
  1367       queue->min_threshold.bytes = g_value_get_uint (value);
  1373       queue->min_threshold.bytes = g_value_get_uint (value);
  1368       queue->orig_min_threshold.bytes = queue->min_threshold.bytes;
  1374       queue->orig_min_threshold.bytes = queue->min_threshold.bytes;
  1369       QUEUE_THRESHOLD_CHANGE (queue);
  1375       QUEUE_THRESHOLD_CHANGE (queue);