gstreamer_core/plugins/elements/gstmultiqueue.c
branchRCL_3
changeset 30 7e817e7e631c
parent 29 567bb019e3e3
equal deleted inserted replaced
29:567bb019e3e3 30:7e817e7e631c
    21  * Boston, MA 02111-1307, USA.
    21  * Boston, MA 02111-1307, USA.
    22  */
    22  */
    23 
    23 
    24 /**
    24 /**
    25  * SECTION:element-multiqueue
    25  * SECTION:element-multiqueue
       
    26  * @short_description: Asynchronous data queues
    26  * @see_also: #GstQueue
    27  * @see_also: #GstQueue
    27  *
    28  *
    28  * <refsect2>
    29  * <refsect2>
    29  * <para>
    30  * <para>
    30  * Multiqueue is similar to a normal #GstQueue with the following additional
    31  * Multiqueue is similar to a normal #GstQueue with the following additional
    32  * <orderedlist>
    33  * <orderedlist>
    33  * <listitem>
    34  * <listitem>
    34  *   <itemizedlist><title>Multiple streamhandling</title>
    35  *   <itemizedlist><title>Multiple streamhandling</title>
    35  *   <listitem><para>
    36  *   <listitem><para>
    36  *     The element handles queueing data on more than one stream at once. To
    37  *     The element handles queueing data on more than one stream at once. To
    37  *     achieve such a feature it has request sink pads (sink%d) and
    38  *     achieve such a feature it has request sink pads (sink_%d) and
    38  *     'sometimes' src pads (src%d).
    39  *     'sometimes' src pads (src_%d).
    39  *   </para><para>
    40  *   </para><para>
    40  *     When requesting a given sinkpad with gst_element_get_request_pad(),
    41  *     When requesting a given sinkpad with gst_element_get_request_pad(),
    41  *     the associated srcpad for that stream will be created.
    42  *     the associated srcpad for that stream will be created.
    42  *     Example: requesting sink1 will generate src1.
    43  *     Ex: requesting sink_1 will generate src_1.
    43  *   </para></listitem>
    44  *   </para></listitem>
    44  *   </itemizedlist>
    45  *   </itemizedlist>
    45  * </listitem>
    46  * </listitem>
    46  * <listitem>
    47  * <listitem>
    47  *   <itemizedlist><title>Non-starvation on multiple streams</title>
    48  *   <itemizedlist><title>Non-starvation on multiple streams</title>
   109 #endif
   110 #endif
   110 
   111 
   111 #include <gst/gst.h>
   112 #include <gst/gst.h>
   112 #include "gstmultiqueue.h"
   113 #include "gstmultiqueue.h"
   113 
   114 
       
   115 #ifdef __SYMBIAN32__
       
   116 #include <glib_global.h>
       
   117 #include <gobject_global.h>
       
   118 #endif
       
   119 
   114 /**
   120 /**
   115  * GstSingleQueue:
   121  * GstSingleQueue:
   116  * @sinkpad: associated sink #GstPad
   122  * @sinkpad: associated sink #GstPad
   117  * @srcpad: associated source #GstPad
   123  * @srcpad: associated source #GstPad
   118  *
   124  *
   167 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
   173 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
   168 static void gst_single_queue_free (GstSingleQueue * squeue);
   174 static void gst_single_queue_free (GstSingleQueue * squeue);
   169 
   175 
   170 static void wake_up_next_non_linked (GstMultiQueue * mq);
   176 static void wake_up_next_non_linked (GstMultiQueue * mq);
   171 static void compute_high_id (GstMultiQueue * mq);
   177 static void compute_high_id (GstMultiQueue * mq);
   172 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
       
   173 
   178 
   174 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
   179 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
   175     GST_PAD_SINK,
   180     GST_PAD_SINK,
   176     GST_PAD_REQUEST,
   181     GST_PAD_REQUEST,
   177     GST_STATIC_CAPS_ANY);
   182     GST_STATIC_CAPS_ANY);
   270       GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
   275       GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
   271   gobject_class->get_property =
   276   gobject_class->get_property =
   272       GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
   277       GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
   273 
   278 
   274   /* SIGNALS */
   279   /* SIGNALS */
   275 
       
   276   /**
       
   277    * GstMultiQueue::underrun:
       
   278    * @multiqueue: the multqueue instance
       
   279    *
       
   280    * This signal is emitted from the streaming thread when there is
       
   281    * no data in any of the queues inside the multiqueue instance (underrun).
       
   282    *
       
   283    * This indicates either starvation or EOS from the upstream data sources.
       
   284    */
       
   285   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
   280   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
   286       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   281       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   287       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
   282       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
   288       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   283       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   289 
   284 
   290   /**
       
   291    * GstMultiQueue::overrun:
       
   292    * @multiqueue: the multiqueue instance
       
   293    *
       
   294    * Reports that one of the queues in the multiqueue is full (overrun).
       
   295    * A queue is full if the total amount of data inside it (num-buffers, time,
       
   296    * size) is higher than the boundary values which can be set through the
       
   297    * GObject properties.
       
   298    *
       
   299    * This can be used as an indicator of pre-roll. 
       
   300    */
       
   301   gst_multi_queue_signals[SIGNAL_OVERRUN] =
   285   gst_multi_queue_signals[SIGNAL_OVERRUN] =
   302       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   286       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   303       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
   287       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
   304       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   288       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   305 
   289 
   306   /* PROPERTIES */
   290   /* PROPERTIES */
   307 
   291 
   308   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
   292   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
   309       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   293       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   310           "Max. amount of data in the queue (bytes, 0=disable)",
   294           "Max. amount of data in the queue (bytes, 0=disable)",
   311           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
   295           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
   312           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   313   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
   296   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
   314       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   297       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   315           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
   298           "Max. number of buffers in the queue (0=disable)",
   316           DEFAULT_MAX_SIZE_BUFFERS,
   299           0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
   317           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   318   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
   300   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
   319       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   301       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   320           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
   302           "Max. amount of data in the queue (in ns, 0=disable)",
   321           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   303           0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
   322 
   304 
   323   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
   305   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
   324       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
   306       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
   325           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
   307           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
   326           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
   308           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
   327           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   328   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
   309   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
   329       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
   310       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
   330           "Amount of buffers the queues can grow if one of them is empty (0=disable)",
   311           "Amount of buffers the queues can grow if one of them is empty (0=disable)",
   331           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
   312           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
   332           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   333   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
   313   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
   334       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
   314       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
   335           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
   315           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
   336           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
   316           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
   337           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   338 
   317 
   339   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
   318   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
   340 
   319 
   341   gstelement_class->request_new_pad =
   320   gstelement_class->request_new_pad =
   342       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
   321       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
   624   gint64 sink_time, src_time;
   603   gint64 sink_time, src_time;
   625 
   604 
   626   sink_time =
   605   sink_time =
   627       gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
   606       gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
   628       sq->sink_segment.last_stop);
   607       sq->sink_segment.last_stop);
   629   if (sink_time == GST_CLOCK_TIME_NONE)
       
   630     goto beach;
       
   631 
   608 
   632   src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
   609   src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
   633       sq->src_segment.last_stop);
   610       sq->src_segment.last_stop);
   634   if (src_time == GST_CLOCK_TIME_NONE)
       
   635     goto beach;
       
   636 
   611 
   637   GST_DEBUG_OBJECT (mq,
   612   GST_DEBUG_OBJECT (mq,
   638       "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
   613       "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
   639       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
   614       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
   640 
   615 
   641   /* This allows for streams with out of order timestamping - sometimes the
   616   /* This allows for streams with out of order timestamping - sometimes the 
   642    * emerging timestamp is later than the arriving one(s) */
   617    * emerging timestamp is later than the arriving one(s) */
   643   if (sink_time < src_time)
   618   if (sink_time >= src_time)
   644     goto beach;
   619     sq->cur_time = sink_time - src_time;
   645 
   620   else
   646   sq->cur_time = sink_time - src_time;
   621     sq->cur_time = 0;
   647   return;
       
   648 
       
   649 beach:
       
   650   sq->cur_time = 0;
       
   651 }
   622 }
   652 
   623 
   653 /* take a NEWSEGMENT event and apply the values to segment, updating the time
   624 /* take a NEWSEGMENT event and apply the values to segment, updating the time
   654  * level of queue. */
   625  * level of queue. */
   655 static void
   626 static void
   794 static void
   765 static void
   795 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
   766 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
   796 {
   767 {
   797   if (item->object)
   768   if (item->object)
   798     gst_mini_object_unref (item->object);
   769     gst_mini_object_unref (item->object);
   799   g_slice_free (GstMultiQueueItem, item);
   770   g_free (item);
   800 }
   771 }
   801 
   772 
   802 /* takes ownership of passed mini object! */
   773 /* takes ownership of passed mini object! */
   803 static GstMultiQueueItem *
   774 static GstMultiQueueItem *
   804 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
   775 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
   805 {
   776 {
   806   GstMultiQueueItem *item;
   777   GstMultiQueueItem *item;
   807 
   778 
   808   item = g_slice_new (GstMultiQueueItem);
   779   item = g_new (GstMultiQueueItem, 1);
   809   item->object = object;
   780   item->object = object;
   810   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
   781   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
   811   item->posid = curid;
   782   item->posid = curid;
   812 
   783 
   813   if (GST_IS_BUFFER (object)) {
   784   if (GST_IS_BUFFER (object)) {
  1089    * a lock, the _check_full happens from this thread only, right before pushing
  1060    * a lock, the _check_full happens from this thread only, right before pushing
  1090    * into dataqueue. */
  1061    * into dataqueue. */
  1091   switch (type) {
  1062   switch (type) {
  1092     case GST_EVENT_EOS:
  1063     case GST_EVENT_EOS:
  1093       sq->is_eos = TRUE;
  1064       sq->is_eos = TRUE;
  1094       single_queue_overrun_cb (sq->queue, sq);
       
  1095       break;
  1065       break;
  1096     case GST_EVENT_NEWSEGMENT:
  1066     case GST_EVENT_NEWSEGMENT:
  1097       apply_segment (mq, sq, sref, &sq->sink_segment);
  1067       apply_segment (mq, sq, sref, &sq->sink_segment);
  1098       gst_event_unref (sref);
  1068       gst_event_unref (sref);
  1099       break;
  1069       break;
  1295     GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
  1265     GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
  1296 
  1266 
  1297     if (gst_data_queue_is_empty (ssq->queue)) {
  1267     if (gst_data_queue_is_empty (ssq->queue)) {
  1298       GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
  1268       GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
  1299       if (IS_FILLED (visible, size.visible)) {
  1269       if (IS_FILLED (visible, size.visible)) {
  1300         sq->max_size.visible = size.visible + 1;
  1270         sq->max_size.visible++;
  1301         GST_DEBUG_OBJECT (mq,
  1271         GST_DEBUG_OBJECT (mq,
  1302             "Another queue is empty, bumping single queue %d max visible to %d",
  1272             "Another queue is empty, bumping single queue %d max visible to %d",
  1303             sq->id, sq->max_size.visible);
  1273             sq->id, sq->max_size.visible);
  1304       }
  1274       }
  1305       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
  1275       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
  1312         "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
  1282         "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
  1313         G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
  1283         G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
  1314         ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
  1284         ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
  1315 
  1285 
  1316     /* if this queue is filled completely we must signal overrun */
  1286     /* if this queue is filled completely we must signal overrun */
  1317     if (sq->is_eos || IS_FILLED (bytes, ssize.bytes) ||
  1287     if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
  1318         IS_FILLED (time, sq->cur_time)) {
       
  1319       GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
  1288       GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
  1320       filled = TRUE;
  1289       filled = TRUE;
  1321     }
  1290     }
  1322   }
  1291   }
  1323   /* no queues were empty */
  1292   /* no queues were empty */
  1350     if (gst_data_queue_is_full (sq->queue)) {
  1319     if (gst_data_queue_is_full (sq->queue)) {
  1351       GstDataQueueSize size;
  1320       GstDataQueueSize size;
  1352 
  1321 
  1353       gst_data_queue_get_level (sq->queue, &size);
  1322       gst_data_queue_get_level (sq->queue, &size);
  1354       if (IS_FILLED (visible, size.visible)) {
  1323       if (IS_FILLED (visible, size.visible)) {
  1355         sq->max_size.visible = size.visible + 1;
  1324         sq->max_size.visible++;
  1356         GST_DEBUG_OBJECT (mq,
  1325         GST_DEBUG_OBJECT (mq,
  1357             "queue %d is filled, bumping its max visible to %d", sq->id,
  1326             "queue %d is filled, bumping its max visible to %d", sq->id,
  1358             sq->max_size.visible);
  1327             sq->max_size.visible);
  1359         gst_data_queue_limits_changed (sq->queue);
  1328         gst_data_queue_limits_changed (sq->queue);
  1360       }
  1329       }
  1386 
  1355 
  1387   /* we never go past the max visible items */
  1356   /* we never go past the max visible items */
  1388   if (IS_FILLED (visible, visible))
  1357   if (IS_FILLED (visible, visible))
  1389     return TRUE;
  1358     return TRUE;
  1390 
  1359 
  1391   /* check time or bytes */
  1360   if (sq->cur_time != 0) {
  1392   res = IS_FILLED (time, sq->cur_time) || IS_FILLED (bytes, bytes);
  1361     /* if we have valid time in the queue, check */
  1393 
  1362     res = IS_FILLED (time, sq->cur_time);
       
  1363   } else {
       
  1364     /* no valid time, check bytes */
       
  1365     res = IS_FILLED (bytes, bytes);
       
  1366   }
  1394   return res;
  1367   return res;
  1395 }
  1368 }
  1396 
  1369 
  1397 static void
  1370 static void
  1398 gst_single_queue_free (GstSingleQueue * sq)
  1371 gst_single_queue_free (GstSingleQueue * sq)
  1480       GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
  1453       GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
  1481 
  1454 
  1482   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
  1455   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
  1483   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
  1456   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
  1484 
  1457 
  1485   /* only activate the pads when we are not in the NULL state
  1458   gst_pad_set_active (sq->srcpad, TRUE);
  1486    * and add the pad under the state_lock to prevend state changes
       
  1487    * between activating and adding */
       
  1488   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
       
  1489   if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
       
  1490     gst_pad_set_active (sq->srcpad, TRUE);
       
  1491     gst_pad_set_active (sq->sinkpad, TRUE);
       
  1492   }
       
  1493   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
  1459   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
       
  1460 
       
  1461   gst_pad_set_active (sq->sinkpad, TRUE);
  1494   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
  1462   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
  1495   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
       
  1496 
  1463 
  1497   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
  1464   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
  1498       sq->id);
  1465       sq->id);
  1499 
  1466 
  1500   return sq;
  1467   return sq;