gstreamer_core/plugins/elements/gstmultiqueue.c
changeset 8 4a7fac7dd34a
parent 0 0e761a78d257
child 30 7e817e7e631c
equal deleted inserted replaced
7:71e347f905f2 8:4a7fac7dd34a
    21  * Boston, MA 02111-1307, USA.
    21  * Boston, MA 02111-1307, USA.
    22  */
    22  */
    23 
    23 
    24 /**
    24 /**
    25  * SECTION:element-multiqueue
    25  * SECTION:element-multiqueue
    26  * @short_description: Asynchronous data queues
       
    27  * @see_also: #GstQueue
    26  * @see_also: #GstQueue
    28  *
    27  *
    29  * <refsect2>
    28  * <refsect2>
    30  * <para>
    29  * <para>
    31  * Multiqueue is similar to a normal #GstQueue with the following additional
    30  * Multiqueue is similar to a normal #GstQueue with the following additional
    33  * <orderedlist>
    32  * <orderedlist>
    34  * <listitem>
    33  * <listitem>
    35  *   <itemizedlist><title>Multiple streamhandling</title>
    34  *   <itemizedlist><title>Multiple streamhandling</title>
    36  *   <listitem><para>
    35  *   <listitem><para>
    37  *     The element handles queueing data on more than one stream at once. To
    36  *     The element handles queueing data on more than one stream at once. To
    38  *     achieve such a feature it has request sink pads (sink_%d) and
    37  *     achieve such a feature it has request sink pads (sink%d) and
    39  *     'sometimes' src pads (src_%d).
    38  *     'sometimes' src pads (src%d).
    40  *   </para><para>
    39  *   </para><para>
    41  *     When requesting a given sinkpad with gst_element_get_request_pad(),
    40  *     When requesting a given sinkpad with gst_element_get_request_pad(),
    42  *     the associated srcpad for that stream will be created.
    41  *     the associated srcpad for that stream will be created.
    43  *     Ex: requesting sink_1 will generate src_1.
    42  *     Example: requesting sink1 will generate src1.
    44  *   </para></listitem>
    43  *   </para></listitem>
    45  *   </itemizedlist>
    44  *   </itemizedlist>
    46  * </listitem>
    45  * </listitem>
    47  * <listitem>
    46  * <listitem>
    48  *   <itemizedlist><title>Non-starvation on multiple streams</title>
    47  *   <itemizedlist><title>Non-starvation on multiple streams</title>
   110 #endif
   109 #endif
   111 
   110 
   112 #include <gst/gst.h>
   111 #include <gst/gst.h>
   113 #include "gstmultiqueue.h"
   112 #include "gstmultiqueue.h"
   114 
   113 
   115 #ifdef __SYMBIAN32__
       
   116 #include <glib_global.h>
       
   117 #include <gobject_global.h>
       
   118 #endif
       
   119 
       
   120 /**
   114 /**
   121  * GstSingleQueue:
   115  * GstSingleQueue:
   122  * @sinkpad: associated sink #GstPad
   116  * @sinkpad: associated sink #GstPad
   123  * @srcpad: associated source #GstPad
   117  * @srcpad: associated source #GstPad
   124  *
   118  *
   173 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
   167 static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
   174 static void gst_single_queue_free (GstSingleQueue * squeue);
   168 static void gst_single_queue_free (GstSingleQueue * squeue);
   175 
   169 
   176 static void wake_up_next_non_linked (GstMultiQueue * mq);
   170 static void wake_up_next_non_linked (GstMultiQueue * mq);
   177 static void compute_high_id (GstMultiQueue * mq);
   171 static void compute_high_id (GstMultiQueue * mq);
       
   172 static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
   178 
   173 
   179 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
   174 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
   180     GST_PAD_SINK,
   175     GST_PAD_SINK,
   181     GST_PAD_REQUEST,
   176     GST_PAD_REQUEST,
   182     GST_STATIC_CAPS_ANY);
   177     GST_STATIC_CAPS_ANY);
   275       GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
   270       GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
   276   gobject_class->get_property =
   271   gobject_class->get_property =
   277       GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
   272       GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);
   278 
   273 
   279   /* SIGNALS */
   274   /* SIGNALS */
       
   275 
       
   276   /**
       
   277    * GstMultiQueue::underrun:
       
   278    * @multiqueue: the multqueue instance
       
   279    *
       
   280    * This signal is emitted from the streaming thread when there is
       
   281    * no data in any of the queues inside the multiqueue instance (underrun).
       
   282    *
       
   283    * This indicates either starvation or EOS from the upstream data sources.
       
   284    */
   280   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
   285   gst_multi_queue_signals[SIGNAL_UNDERRUN] =
   281       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   286       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   282       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
   287       G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
   283       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   288       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   284 
   289 
       
   290   /**
       
   291    * GstMultiQueue::overrun:
       
   292    * @multiqueue: the multiqueue instance
       
   293    *
       
   294    * Reports that one of the queues in the multiqueue is full (overrun).
       
   295    * A queue is full if the total amount of data inside it (num-buffers, time,
       
   296    * size) is higher than the boundary values which can be set through the
       
   297    * GObject properties.
       
   298    *
       
   299    * This can be used as an indicator of pre-roll. 
       
   300    */
   285   gst_multi_queue_signals[SIGNAL_OVERRUN] =
   301   gst_multi_queue_signals[SIGNAL_OVERRUN] =
   286       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   302       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
   287       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
   303       G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
   288       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   304       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
   289 
   305 
   290   /* PROPERTIES */
   306   /* PROPERTIES */
   291 
   307 
   292   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
   308   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
   293       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   309       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   294           "Max. amount of data in the queue (bytes, 0=disable)",
   310           "Max. amount of data in the queue (bytes, 0=disable)",
   295           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
   311           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
       
   312           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   296   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
   313   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
   297       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   314       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   298           "Max. number of buffers in the queue (0=disable)",
   315           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
   299           0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
   316           DEFAULT_MAX_SIZE_BUFFERS,
       
   317           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   300   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
   318   g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
   301       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   319       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   302           "Max. amount of data in the queue (in ns, 0=disable)",
   320           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
   303           0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
   321           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   304 
   322 
   305   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
   323   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
   306       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
   324       g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
   307           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
   325           "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
   308           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
   326           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
       
   327           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   309   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
   328   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
   310       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
   329       g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
   311           "Amount of buffers the queues can grow if one of them is empty (0=disable)",
   330           "Amount of buffers the queues can grow if one of them is empty (0=disable)",
   312           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
   331           0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
       
   332           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   313   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
   333   g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
   314       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
   334       g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
   315           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
   335           "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
   316           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));
   336           0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
       
   337           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   317 
   338 
   318   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
   339   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);
   319 
   340 
   320   gstelement_class->request_new_pad =
   341   gstelement_class->request_new_pad =
   321       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
   342       GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
   603   gint64 sink_time, src_time;
   624   gint64 sink_time, src_time;
   604 
   625 
   605   sink_time =
   626   sink_time =
   606       gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
   627       gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
   607       sq->sink_segment.last_stop);
   628       sq->sink_segment.last_stop);
       
   629   if (sink_time == GST_CLOCK_TIME_NONE)
       
   630     goto beach;
   608 
   631 
   609   src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
   632   src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
   610       sq->src_segment.last_stop);
   633       sq->src_segment.last_stop);
       
   634   if (src_time == GST_CLOCK_TIME_NONE)
       
   635     goto beach;
   611 
   636 
   612   GST_DEBUG_OBJECT (mq,
   637   GST_DEBUG_OBJECT (mq,
   613       "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
   638       "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
   614       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
   639       GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
   615 
   640 
   616   /* This allows for streams with out of order timestamping - sometimes the 
   641   /* This allows for streams with out of order timestamping - sometimes the
   617    * emerging timestamp is later than the arriving one(s) */
   642    * emerging timestamp is later than the arriving one(s) */
   618   if (sink_time >= src_time)
   643   if (sink_time < src_time)
   619     sq->cur_time = sink_time - src_time;
   644     goto beach;
   620   else
   645 
   621     sq->cur_time = 0;
   646   sq->cur_time = sink_time - src_time;
       
   647   return;
       
   648 
       
   649 beach:
       
   650   sq->cur_time = 0;
   622 }
   651 }
   623 
   652 
   624 /* take a NEWSEGMENT event and apply the values to segment, updating the time
   653 /* take a NEWSEGMENT event and apply the values to segment, updating the time
   625  * level of queue. */
   654  * level of queue. */
   626 static void
   655 static void
   765 static void
   794 static void
   766 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
   795 gst_multi_queue_item_destroy (GstMultiQueueItem * item)
   767 {
   796 {
   768   if (item->object)
   797   if (item->object)
   769     gst_mini_object_unref (item->object);
   798     gst_mini_object_unref (item->object);
   770   g_free (item);
   799   g_slice_free (GstMultiQueueItem, item);
   771 }
   800 }
   772 
   801 
   773 /* takes ownership of passed mini object! */
   802 /* takes ownership of passed mini object! */
   774 static GstMultiQueueItem *
   803 static GstMultiQueueItem *
   775 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
   804 gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
   776 {
   805 {
   777   GstMultiQueueItem *item;
   806   GstMultiQueueItem *item;
   778 
   807 
   779   item = g_new (GstMultiQueueItem, 1);
   808   item = g_slice_new (GstMultiQueueItem);
   780   item->object = object;
   809   item->object = object;
   781   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
   810   item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
   782   item->posid = curid;
   811   item->posid = curid;
   783 
   812 
   784   if (GST_IS_BUFFER (object)) {
   813   if (GST_IS_BUFFER (object)) {
  1060    * a lock, the _check_full happens from this thread only, right before pushing
  1089    * a lock, the _check_full happens from this thread only, right before pushing
  1061    * into dataqueue. */
  1090    * into dataqueue. */
  1062   switch (type) {
  1091   switch (type) {
  1063     case GST_EVENT_EOS:
  1092     case GST_EVENT_EOS:
  1064       sq->is_eos = TRUE;
  1093       sq->is_eos = TRUE;
       
  1094       single_queue_overrun_cb (sq->queue, sq);
  1065       break;
  1095       break;
  1066     case GST_EVENT_NEWSEGMENT:
  1096     case GST_EVENT_NEWSEGMENT:
  1067       apply_segment (mq, sq, sref, &sq->sink_segment);
  1097       apply_segment (mq, sq, sref, &sq->sink_segment);
  1068       gst_event_unref (sref);
  1098       gst_event_unref (sref);
  1069       break;
  1099       break;
  1265     GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
  1295     GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);
  1266 
  1296 
  1267     if (gst_data_queue_is_empty (ssq->queue)) {
  1297     if (gst_data_queue_is_empty (ssq->queue)) {
  1268       GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
  1298       GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
  1269       if (IS_FILLED (visible, size.visible)) {
  1299       if (IS_FILLED (visible, size.visible)) {
  1270         sq->max_size.visible++;
  1300         sq->max_size.visible = size.visible + 1;
  1271         GST_DEBUG_OBJECT (mq,
  1301         GST_DEBUG_OBJECT (mq,
  1272             "Another queue is empty, bumping single queue %d max visible to %d",
  1302             "Another queue is empty, bumping single queue %d max visible to %d",
  1273             sq->id, sq->max_size.visible);
  1303             sq->id, sq->max_size.visible);
  1274       }
  1304       }
  1275       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
  1305       GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
  1282         "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
  1312         "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
  1283         G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
  1313         G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
  1284         ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
  1314         ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
  1285 
  1315 
  1286     /* if this queue is filled completely we must signal overrun */
  1316     /* if this queue is filled completely we must signal overrun */
  1287     if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
  1317     if (sq->is_eos || IS_FILLED (bytes, ssize.bytes) ||
       
  1318         IS_FILLED (time, sq->cur_time)) {
  1288       GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
  1319       GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
  1289       filled = TRUE;
  1320       filled = TRUE;
  1290     }
  1321     }
  1291   }
  1322   }
  1292   /* no queues were empty */
  1323   /* no queues were empty */
  1319     if (gst_data_queue_is_full (sq->queue)) {
  1350     if (gst_data_queue_is_full (sq->queue)) {
  1320       GstDataQueueSize size;
  1351       GstDataQueueSize size;
  1321 
  1352 
  1322       gst_data_queue_get_level (sq->queue, &size);
  1353       gst_data_queue_get_level (sq->queue, &size);
  1323       if (IS_FILLED (visible, size.visible)) {
  1354       if (IS_FILLED (visible, size.visible)) {
  1324         sq->max_size.visible++;
  1355         sq->max_size.visible = size.visible + 1;
  1325         GST_DEBUG_OBJECT (mq,
  1356         GST_DEBUG_OBJECT (mq,
  1326             "queue %d is filled, bumping its max visible to %d", sq->id,
  1357             "queue %d is filled, bumping its max visible to %d", sq->id,
  1327             sq->max_size.visible);
  1358             sq->max_size.visible);
  1328         gst_data_queue_limits_changed (sq->queue);
  1359         gst_data_queue_limits_changed (sq->queue);
  1329       }
  1360       }
  1355 
  1386 
  1356   /* we never go past the max visible items */
  1387   /* we never go past the max visible items */
  1357   if (IS_FILLED (visible, visible))
  1388   if (IS_FILLED (visible, visible))
  1358     return TRUE;
  1389     return TRUE;
  1359 
  1390 
  1360   if (sq->cur_time != 0) {
  1391   /* check time or bytes */
  1361     /* if we have valid time in the queue, check */
  1392   res = IS_FILLED (time, sq->cur_time) || IS_FILLED (bytes, bytes);
  1362     res = IS_FILLED (time, sq->cur_time);
  1393 
  1363   } else {
       
  1364     /* no valid time, check bytes */
       
  1365     res = IS_FILLED (bytes, bytes);
       
  1366   }
       
  1367   return res;
  1394   return res;
  1368 }
  1395 }
  1369 
  1396 
  1370 static void
  1397 static void
  1371 gst_single_queue_free (GstSingleQueue * sq)
  1398 gst_single_queue_free (GstSingleQueue * sq)
  1453       GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
  1480       GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
  1454 
  1481 
  1455   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
  1482   gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
  1456   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
  1483   gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
  1457 
  1484 
  1458   gst_pad_set_active (sq->srcpad, TRUE);
  1485   /* only activate the pads when we are not in the NULL state
       
  1486    * and add the pad under the state_lock to prevend state changes
       
  1487    * between activating and adding */
       
  1488   g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
       
  1489   if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
       
  1490     gst_pad_set_active (sq->srcpad, TRUE);
       
  1491     gst_pad_set_active (sq->sinkpad, TRUE);
       
  1492   }
  1459   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
  1493   gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
  1460 
       
  1461   gst_pad_set_active (sq->sinkpad, TRUE);
       
  1462   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
  1494   gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
       
  1495   g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
  1463 
  1496 
  1464   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
  1497   GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
  1465       sq->id);
  1498       sq->id);
  1466 
  1499 
  1467   return sq;
  1500   return sq;