gstreamer_core/libs/gst/base/gstcollectpads.c
changeset 0 0e761a78d257
child 8 4a7fac7dd34a
equal deleted inserted replaced
-1:000000000000 0:0e761a78d257
       
     1 /* GStreamer
       
     2  * Copyright (C) 2005 Wim Taymans <wim@fluendo.com>
       
     3  *
       
     4  * gstcollectpads.c:
       
     5  *
       
     6  * This library is free software; you can redistribute it and/or
       
     7  * modify it under the terms of the GNU Library General Public
       
     8  * License as published by the Free Software Foundation; either
       
     9  * version 2 of the License, or (at your option) any later version.
       
    10  *
       
    11  * This library is distributed in the hope that it will be useful,
       
    12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    14  * Library General Public License for more details.
       
    15  *
       
    16  * You should have received a copy of the GNU Library General Public
       
    17  * License along with this library; if not, write to the
       
    18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       
    19  * Boston, MA 02111-1307, USA.
       
    20  */
       
    21 /**
       
    22  * SECTION:gstcollectpads
       
    23  * @short_description: manages a set of pads that operate in collect mode
       
    24  * @see_also:
       
    25  *
       
    26  * Manages a set of pads that operate in collect mode. This means that control
       
    27  * is given to the manager of this object when all pads have data.
       
    28  * <itemizedlist>
       
    29  *   <listitem><para>
       
    30  *     Collectpads are created with gst_collect_pads_new(). A callback should then
       
    31  *     be installed with gst_collect_pads_set_function (). 
       
    32  *   </para></listitem>
       
    33  *   <listitem><para>
       
    34  *     Pads are added to the collection with gst_collect_pads_add_pad()/
       
    35  *     gst_collect_pads_remove_pad(). The pad
       
    36  *     has to be a sinkpad. The chain and event functions of the pad are
       
    37  *     overridden. The element_private of the pad is used to store
       
    38  *     private information for the collectpads.
       
    39  *   </para></listitem>
       
    40  *   <listitem><para>
       
    41  *     For each pad, data is queued in the _chain function or by
       
    42  *     performing a pull_range.
       
    43  *   </para></listitem>
       
    44  *   <listitem><para>
       
    45  *     When data is queued on all pads, the callback function is called.
       
    46  *   </para></listitem>
       
    47  *   <listitem><para>
       
    48  *     Data can be dequeued from the pad with the gst_collect_pads_pop() method.
       
    49  *     One can peek at the data with the gst_collect_pads_peek() function.
       
    50  *     These functions will return NULL if the pad received an EOS event. When all
       
    51  *     pads return NULL from a gst_collect_pads_peek(), the element can emit an EOS
       
    52  *     event itself.
       
    53  *   </para></listitem>
       
    54  *   <listitem><para>
       
    55  *     Data can also be dequeued in byte units using the gst_collect_pads_available(), 
       
    56  *     gst_collect_pads_read() and gst_collect_pads_flush() calls.
       
    57  *   </para></listitem>
       
    58  *   <listitem><para>
       
    59  *     Elements should call gst_collect_pads_start() and gst_collect_pads_stop() in
       
    60  *     their state change functions to start and stop the processing of the collecpads.
       
    61  *     The gst_collect_pads_stop() call should be called before calling the parent
       
    62  *     element state change function in the PAUSED_TO_READY state change to ensure
       
    63  *     no pad is blocked and the element can finish streaming.
       
    64  *   </para></listitem>
       
    65  *   <listitem><para>
       
    66  *     gst_collect_pads_collect() and gst_collect_pads_collect_range() can be used by
       
    67  *     elements that start a #GstTask to drive the collect_pads. This feature is however
       
    68  *     not yet implemented.
       
    69  *   </para></listitem>
       
    70  * </itemizedlist>
       
    71  *
       
    72  * Last reviewed on 2006-05-10 (0.10.6)
       
    73  */
       
    74 
       
    75 #ifdef __SYMBIAN32__
       
    76 #include <gst_global.h>
       
    77 #endif
       
    78 #include "gstcollectpads.h"
       
    79 
       
    80 #ifdef __SYMBIAN32__
       
    81 #include <glib_global.h>
       
    82 #include <gobject_global.h>
       
    83 #endif
       
    84 GST_DEBUG_CATEGORY_STATIC (collect_pads_debug);
       
    85 #define GST_CAT_DEFAULT collect_pads_debug
       
    86 
       
    87 GST_BOILERPLATE (GstCollectPads, gst_collect_pads, GstObject, GST_TYPE_OBJECT);
       
    88 
       
    89 static void gst_collect_pads_clear (GstCollectPads * pads,
       
    90     GstCollectData * data);
       
    91 static GstFlowReturn gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer);
       
    92 static gboolean gst_collect_pads_event (GstPad * pad, GstEvent * event);
       
    93 static void gst_collect_pads_finalize (GObject * object);
       
    94 static void gst_collect_pads_init (GstCollectPads * pads,
       
    95     GstCollectPadsClass * g_class);
       
    96 static void ref_data (GstCollectData * data);
       
    97 static void unref_data (GstCollectData * data);
       
    98 
       
    99 static void
       
   100 gst_collect_pads_base_init (gpointer g_class)
       
   101 {
       
   102   /* Do nothing here */
       
   103 }
       
   104 
       
   105 static void
       
   106 gst_collect_pads_class_init (GstCollectPadsClass * klass)
       
   107 {
       
   108   GObjectClass *gobject_class = (GObjectClass *) klass;
       
   109 
       
   110   GST_DEBUG_CATEGORY_INIT (collect_pads_debug, "collectpads", 0,
       
   111       "GstCollectPads");
       
   112 
       
   113   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_collect_pads_finalize);
       
   114 }
       
   115 
       
   116 static void
       
   117 gst_collect_pads_init (GstCollectPads * pads, GstCollectPadsClass * g_class)
       
   118 {
       
   119   pads->cond = g_cond_new ();
       
   120   pads->data = NULL;
       
   121   pads->cookie = 0;
       
   122   pads->numpads = 0;
       
   123   pads->queuedpads = 0;
       
   124   pads->eospads = 0;
       
   125   pads->started = FALSE;
       
   126 
       
   127   /* members to manage the pad list */
       
   128   pads->abidata.ABI.pad_lock = g_mutex_new ();
       
   129   pads->abidata.ABI.pad_cookie = 0;
       
   130   pads->abidata.ABI.pad_list = NULL;
       
   131 }
       
   132 
       
   133 static void
       
   134 gst_collect_pads_finalize (GObject * object)
       
   135 {
       
   136   GSList *collected;
       
   137   GstCollectPads *pads = GST_COLLECT_PADS (object);
       
   138 
       
   139   GST_DEBUG ("finalize");
       
   140 
       
   141   g_cond_free (pads->cond);
       
   142   g_mutex_free (pads->abidata.ABI.pad_lock);
       
   143 
       
   144   /* Remove pads */
       
   145   collected = pads->abidata.ABI.pad_list;
       
   146   for (; collected; collected = g_slist_next (collected)) {
       
   147     GstCollectData *pdata = (GstCollectData *) collected->data;
       
   148 
       
   149     unref_data (pdata);
       
   150   }
       
   151   /* Free pads list */
       
   152   g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
       
   153   g_slist_free (pads->data);
       
   154   g_slist_free (pads->abidata.ABI.pad_list);
       
   155 
       
   156   G_OBJECT_CLASS (parent_class)->finalize (object);
       
   157 }
       
   158 
       
   159 /**
       
   160  * gst_collect_pads_new:
       
   161  *
       
   162  * Create a new instance of #GstCollectsPads.
       
   163  *
       
   164  * Returns: a new #GstCollectPads, or NULL in case of an error.
       
   165  *
       
   166  * MT safe.
       
   167  */
       
   168 #ifdef __SYMBIAN32__
       
   169 EXPORT_C
       
   170 #endif
       
   171 
       
   172 GstCollectPads *
       
   173 gst_collect_pads_new (void)
       
   174 {
       
   175   GstCollectPads *newcoll;
       
   176 
       
   177   newcoll = g_object_new (GST_TYPE_COLLECT_PADS, NULL);
       
   178 
       
   179   return newcoll;
       
   180 }
       
   181 
       
   182 /**
       
   183  * gst_collect_pads_set_function:
       
   184  * @pads: the collectspads to use
       
   185  * @func: the function to set
       
   186  * @user_data: user data passed to the function
       
   187  *
       
   188  * Set the callback function and user data that will be called when
       
   189  * all the pads added to the collection have buffers queued.
       
   190  *
       
   191  * MT safe.
       
   192  */
       
   193 #ifdef __SYMBIAN32__
       
   194 EXPORT_C
       
   195 #endif
       
   196 
       
   197 void
       
   198 gst_collect_pads_set_function (GstCollectPads * pads,
       
   199     GstCollectPadsFunction func, gpointer user_data)
       
   200 {
       
   201   g_return_if_fail (pads != NULL);
       
   202   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
       
   203 
       
   204   GST_OBJECT_LOCK (pads);
       
   205   pads->func = func;
       
   206   pads->user_data = user_data;
       
   207   GST_OBJECT_UNLOCK (pads);
       
   208 }
       
   209 
       
   210 static void
       
   211 ref_data (GstCollectData * data)
       
   212 {
       
   213   g_assert (data != NULL);
       
   214 
       
   215   g_atomic_int_inc (&(data->abidata.ABI.refcount));
       
   216 }
       
   217 
       
   218 static void
       
   219 unref_data (GstCollectData * data)
       
   220 {
       
   221   GstCollectDataDestroyNotify destroy_notify;
       
   222 
       
   223   g_assert (data != NULL);
       
   224   g_assert (data->abidata.ABI.refcount > 0);
       
   225 
       
   226   if (!g_atomic_int_dec_and_test (&(data->abidata.ABI.refcount)))
       
   227     return;
       
   228 
       
   229   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
       
   230   destroy_notify = (GstCollectDataDestroyNotify)
       
   231       g_object_get_data (G_OBJECT (data->pad),
       
   232       "gst-collect-data-destroy-notify");
       
   233 
       
   234   if (destroy_notify)
       
   235     destroy_notify (data);
       
   236 
       
   237   g_object_unref (data->pad);
       
   238   if (data->buffer) {
       
   239     gst_buffer_unref (data->buffer);
       
   240   }
       
   241   g_free (data);
       
   242 }
       
   243 
       
   244 /**
       
   245  * gst_collect_pads_add_pad:
       
   246  * @pads: the collectspads to use
       
   247  * @pad: the pad to add
       
   248  * @size: the size of the returned #GstCollectData structure
       
   249  *
       
   250  * Add a pad to the collection of collect pads. The pad has to be
       
   251  * a sinkpad. The refcount of the pad is incremented. Use
       
   252  * gst_collect_pads_remove_pad() to remove the pad from the collection
       
   253  * again.
       
   254  *
       
   255  * You specify a size for the returned #GstCollectData structure
       
   256  * so that you can use it to store additional information.
       
   257  *
       
   258  * The pad will be automatically activated in push mode when @pads is
       
   259  * started.
       
   260  *
       
   261  * This function calls gst_collect_pads_add_pad() passing a value of NULL
       
   262  * for destroy_notify.
       
   263  *
       
   264  * Returns: a new #GstCollectData to identify the new pad. Or NULL
       
   265  *   if wrong parameters are supplied.
       
   266  *
       
   267  * MT safe.
       
   268  */
       
   269 #ifdef __SYMBIAN32__
       
   270 EXPORT_C
       
   271 #endif
       
   272 
       
   273 GstCollectData *
       
   274 gst_collect_pads_add_pad (GstCollectPads * pads, GstPad * pad, guint size)
       
   275 {
       
   276   return gst_collect_pads_add_pad_full (pads, pad, size, NULL);
       
   277 }
       
   278 
       
   279 /**
       
   280  * gst_collect_pads_add_pad_full:
       
   281  * @pads: the collectspads to use
       
   282  * @pad: the pad to add
       
   283  * @size: the size of the returned #GstCollectData structure
       
   284  * @destroy_notify: function to be called before the returned #GstCollectData
       
   285  * structure is freed
       
   286  *
       
   287  * Add a pad to the collection of collect pads. The pad has to be
       
   288  * a sinkpad. The refcount of the pad is incremented. Use
       
   289  * gst_collect_pads_remove_pad() to remove the pad from the collection
       
   290  * again.
       
   291  *
       
   292  * You specify a size for the returned #GstCollectData structure
       
   293  * so that you can use it to store additional information.
       
   294  *
       
   295  * You can also specify a #GstCollectDataDestroyNotify that will be called
       
   296  * just before the #GstCollectData structure is freed. It is passed the
       
   297  * pointer to the structure and should free any custom memory and resources
       
   298  * allocated for it.
       
   299  *
       
   300  * The pad will be automatically activated in push mode when @pads is
       
   301  * started.
       
   302  *
       
   303  * Since: 0.10.12
       
   304  *
       
   305  * Returns: a new #GstCollectData to identify the new pad. Or NULL
       
   306  *   if wrong parameters are supplied.
       
   307  *
       
   308  * MT safe.
       
   309  */
       
   310 #ifdef __SYMBIAN32__
       
   311 EXPORT_C
       
   312 #endif
       
   313 
       
   314 GstCollectData *
       
   315 gst_collect_pads_add_pad_full (GstCollectPads * pads, GstPad * pad, guint size,
       
   316     GstCollectDataDestroyNotify destroy_notify)
       
   317 {
       
   318   GstCollectData *data;
       
   319 
       
   320   g_return_val_if_fail (pads != NULL, NULL);
       
   321   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
       
   322   g_return_val_if_fail (pad != NULL, NULL);
       
   323   g_return_val_if_fail (GST_PAD_IS_SINK (pad), NULL);
       
   324   g_return_val_if_fail (size >= sizeof (GstCollectData), NULL);
       
   325 
       
   326   GST_DEBUG ("adding pad %s:%s", GST_DEBUG_PAD_NAME (pad));
       
   327 
       
   328   data = g_malloc0 (size);
       
   329   data->collect = pads;
       
   330   data->pad = gst_object_ref (pad);
       
   331   data->buffer = NULL;
       
   332   data->pos = 0;
       
   333   gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
       
   334   data->abidata.ABI.flushing = FALSE;
       
   335   data->abidata.ABI.new_segment = FALSE;
       
   336   data->abidata.ABI.eos = FALSE;
       
   337   data->abidata.ABI.refcount = 1;
       
   338 
       
   339   /* FIXME: Ugly hack as we can't add more fields to GstCollectData */
       
   340   g_object_set_data (G_OBJECT (pad), "gst-collect-data-destroy-notify",
       
   341       (void *) destroy_notify);
       
   342 
       
   343   GST_COLLECT_PADS_PAD_LOCK (pads);
       
   344   GST_OBJECT_LOCK (pad);
       
   345   gst_pad_set_element_private (pad, data);
       
   346   GST_OBJECT_UNLOCK (pad);
       
   347   pads->abidata.ABI.pad_list =
       
   348       g_slist_append (pads->abidata.ABI.pad_list, data);
       
   349   gst_pad_set_chain_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_chain));
       
   350   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_collect_pads_event));
       
   351   /* activate the pad when needed */
       
   352   if (pads->started)
       
   353     gst_pad_set_active (pad, TRUE);
       
   354   pads->abidata.ABI.pad_cookie++;
       
   355   GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
   356 
       
   357   return data;
       
   358 }
       
   359 
       
   360 static gint
       
   361 find_pad (GstCollectData * data, GstPad * pad)
       
   362 {
       
   363   if (data->pad == pad)
       
   364     return 0;
       
   365   return 1;
       
   366 }
       
   367 
       
   368 /**
       
   369  * gst_collect_pads_remove_pad:
       
   370  * @pads: the collectspads to use
       
   371  * @pad: the pad to remove
       
   372  *
       
   373  * Remove a pad from the collection of collect pads. This function will also
       
   374  * free the #GstCollectData and all the resources that were allocated with 
       
   375  * gst_collect_pads_add_pad().
       
   376  *
       
   377  * The pad will be deactivated automatically when @pads is stopped.
       
   378  *
       
   379  * Returns: %TRUE if the pad could be removed.
       
   380  *
       
   381  * MT safe.
       
   382  */
       
   383 #ifdef __SYMBIAN32__
       
   384 EXPORT_C
       
   385 #endif
       
   386 
       
   387 gboolean
       
   388 gst_collect_pads_remove_pad (GstCollectPads * pads, GstPad * pad)
       
   389 {
       
   390   GstCollectData *data;
       
   391   GSList *list;
       
   392 
       
   393   g_return_val_if_fail (pads != NULL, FALSE);
       
   394   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
       
   395   g_return_val_if_fail (pad != NULL, FALSE);
       
   396   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
       
   397 
       
   398   GST_DEBUG ("removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
       
   399 
       
   400   GST_COLLECT_PADS_PAD_LOCK (pads);
       
   401   list =
       
   402       g_slist_find_custom (pads->abidata.ABI.pad_list, pad,
       
   403       (GCompareFunc) find_pad);
       
   404   if (!list)
       
   405     goto unknown_pad;
       
   406 
       
   407   data = (GstCollectData *) list->data;
       
   408 
       
   409   GST_DEBUG ("found pad %s:%s at %p", GST_DEBUG_PAD_NAME (pad), data);
       
   410 
       
   411   /* clear the stuff we configured */
       
   412   gst_pad_set_chain_function (pad, NULL);
       
   413   gst_pad_set_event_function (pad, NULL);
       
   414   GST_OBJECT_LOCK (pad);
       
   415   gst_pad_set_element_private (pad, NULL);
       
   416   GST_OBJECT_UNLOCK (pad);
       
   417 
       
   418   /* backward compat, also remove from data if stopped, note that this function
       
   419    * can only be called when we are stopped because we don't take the LOCK to
       
   420    * protect the pads->data list. */
       
   421   if (!pads->started) {
       
   422     GSList *dlist;
       
   423 
       
   424     dlist = g_slist_find_custom (pads->data, pad, (GCompareFunc) find_pad);
       
   425     if (dlist) {
       
   426       GstCollectData *pdata = dlist->data;
       
   427 
       
   428       pads->data = g_slist_delete_link (pads->data, dlist);
       
   429       unref_data (pdata);
       
   430     }
       
   431   }
       
   432   /* remove from the pad list */
       
   433   pads->abidata.ABI.pad_list =
       
   434       g_slist_delete_link (pads->abidata.ABI.pad_list, list);
       
   435   pads->abidata.ABI.pad_cookie++;
       
   436 
       
   437   /* signal waiters because something changed */
       
   438   GST_COLLECT_PADS_BROADCAST (pads);
       
   439 
       
   440   /* deactivate the pad when needed */
       
   441   if (!pads->started)
       
   442     gst_pad_set_active (pad, FALSE);
       
   443 
       
   444   /* clean and free the collect data */
       
   445   unref_data (data);
       
   446 
       
   447   GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
   448 
       
   449   return TRUE;
       
   450 
       
   451 unknown_pad:
       
   452   {
       
   453     GST_WARNING ("cannot remove unknown pad %s:%s", GST_DEBUG_PAD_NAME (pad));
       
   454     GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
   455     return FALSE;
       
   456   }
       
   457 }
       
   458 
       
   459 /**
       
   460  * gst_collect_pads_is_active:
       
   461  * @pads: the collectspads to use
       
   462  * @pad: the pad to check
       
   463  *
       
   464  * Check if a pad is active.
       
   465  *
       
   466  * This function is currently not implemented.
       
   467  *
       
   468  * Returns: %TRUE if the pad is active.
       
   469  *
       
   470  * MT safe.
       
   471  */
       
   472 #ifdef __SYMBIAN32__
       
   473 EXPORT_C
       
   474 #endif
       
   475 
       
   476 gboolean
       
   477 gst_collect_pads_is_active (GstCollectPads * pads, GstPad * pad)
       
   478 {
       
   479   g_return_val_if_fail (pads != NULL, FALSE);
       
   480   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), FALSE);
       
   481   g_return_val_if_fail (pad != NULL, FALSE);
       
   482   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
       
   483 
       
   484   g_warning ("gst_collect_pads_is_active() is not implemented");
       
   485 
       
   486   return FALSE;
       
   487 }
       
   488 
       
   489 /**
       
   490  * gst_collect_pads_collect:
       
   491  * @pads: the collectspads to use
       
   492  *
       
   493  * Collect data on all pads. This function is usually called
       
   494  * from a #GstTask function in an element. 
       
   495  *
       
   496  * This function is currently not implemented.
       
   497  *
       
   498  * Returns: #GstFlowReturn of the operation.
       
   499  *
       
   500  * MT safe.
       
   501  */
       
   502 #ifdef __SYMBIAN32__
       
   503 EXPORT_C
       
   504 #endif
       
   505 
       
   506 GstFlowReturn
       
   507 gst_collect_pads_collect (GstCollectPads * pads)
       
   508 {
       
   509   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
       
   510   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
       
   511 
       
   512   g_warning ("gst_collect_pads_collect() is not implemented");
       
   513 
       
   514   return GST_FLOW_NOT_SUPPORTED;
       
   515 }
       
   516 
       
   517 /**
       
   518  * gst_collect_pads_collect_range:
       
   519  * @pads: the collectspads to use
       
   520  * @offset: the offset to collect
       
   521  * @length: the length to collect
       
   522  *
       
   523  * Collect data with @offset and @length on all pads. This function
       
   524  * is typically called in the getrange function of an element. 
       
   525  *
       
   526  * This function is currently not implemented.
       
   527  *
       
   528  * Returns: #GstFlowReturn of the operation.
       
   529  *
       
   530  * MT safe.
       
   531  */
       
   532 #ifdef __SYMBIAN32__
       
   533 EXPORT_C
       
   534 #endif
       
   535 
       
   536 GstFlowReturn
       
   537 gst_collect_pads_collect_range (GstCollectPads * pads, guint64 offset,
       
   538     guint length)
       
   539 {
       
   540   g_return_val_if_fail (pads != NULL, GST_FLOW_ERROR);
       
   541   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
       
   542 
       
   543   g_warning ("gst_collect_pads_collect_range() is not implemented");
       
   544 
       
   545   return GST_FLOW_NOT_SUPPORTED;
       
   546 }
       
   547 
       
   548 /* FIXME, I think this function is used to work around bad behaviour
       
   549  * of elements that add pads to themselves without activating them.
       
   550  *
       
   551  * Must be called with PAD_LOCK.
       
   552  */
       
   553 static void
       
   554 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
       
   555     gboolean flushing)
       
   556 {
       
   557   GSList *walk = NULL;
       
   558 
       
   559   /* Update the pads flushing flag */
       
   560   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
       
   561     GstCollectData *cdata = walk->data;
       
   562 
       
   563     if (GST_IS_PAD (cdata->pad)) {
       
   564       GST_OBJECT_LOCK (cdata->pad);
       
   565       if (flushing)
       
   566         GST_PAD_SET_FLUSHING (cdata->pad);
       
   567       else
       
   568         GST_PAD_UNSET_FLUSHING (cdata->pad);
       
   569       cdata->abidata.ABI.flushing = flushing;
       
   570       gst_collect_pads_clear (pads, cdata);
       
   571       GST_OBJECT_UNLOCK (cdata->pad);
       
   572     }
       
   573   }
       
   574 }
       
   575 
       
   576 /**
       
   577  * gst_collect_pads_set_flushing:
       
   578  * @pads: the collectspads to use
       
   579  * @flushing: desired state of the pads
       
   580  *
       
   581  * Change the flushing state of all the pads in the collection. No pad
       
   582  * is able to accept anymore data when @flushing is %TRUE. Calling this
       
   583  * function with @flushing %FALSE makes @pads accept data again.
       
   584  *
       
   585  * MT safe.
       
   586  *
       
   587  * Since: 0.10.7.
       
   588  */
       
   589 #ifdef __SYMBIAN32__
       
   590 EXPORT_C
       
   591 #endif
       
   592 
       
   593 void
       
   594 gst_collect_pads_set_flushing (GstCollectPads * pads, gboolean flushing)
       
   595 {
       
   596   g_return_if_fail (pads != NULL);
       
   597   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
       
   598 
       
   599   GST_COLLECT_PADS_PAD_LOCK (pads);
       
   600   gst_collect_pads_set_flushing_unlocked (pads, flushing);
       
   601   GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
   602 }
       
   603 
       
   604 /**
       
   605  * gst_collect_pads_start:
       
   606  * @pads: the collectspads to use
       
   607  *
       
   608  * Starts the processing of data in the collect_pads.
       
   609  *
       
   610  * MT safe.
       
   611  */
       
   612 #ifdef __SYMBIAN32__
       
   613 EXPORT_C
       
   614 #endif
       
   615 
       
   616 void
       
   617 gst_collect_pads_start (GstCollectPads * pads)
       
   618 {
       
   619   GSList *collected;
       
   620 
       
   621   g_return_if_fail (pads != NULL);
       
   622   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
       
   623 
       
   624   GST_DEBUG_OBJECT (pads, "starting collect pads");
       
   625 
       
   626   /* make sure stop and collect cannot be called anymore */
       
   627   GST_OBJECT_LOCK (pads);
       
   628 
       
   629   /* make pads streamable */
       
   630   GST_COLLECT_PADS_PAD_LOCK (pads);
       
   631 
       
   632   /* loop over the master pad list and reset the segment */
       
   633   collected = pads->abidata.ABI.pad_list;
       
   634   for (; collected; collected = g_slist_next (collected)) {
       
   635     GstCollectData *data;
       
   636 
       
   637     data = collected->data;
       
   638     gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
       
   639   }
       
   640 
       
   641   gst_collect_pads_set_flushing_unlocked (pads, FALSE);
       
   642 
       
   643   /* Start collect pads */
       
   644   pads->started = TRUE;
       
   645   GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
   646   GST_OBJECT_UNLOCK (pads);
       
   647 }
       
   648 
       
   649 /**
       
   650  * gst_collect_pads_stop:
       
   651  * @pads: the collectspads to use
       
   652  *
       
   653  * Stops the processing of data in the collect_pads. this function
       
   654  * will also unblock any blocking operations.
       
   655  *
       
   656  * MT safe.
       
   657  */
       
   658 #ifdef __SYMBIAN32__
       
   659 EXPORT_C
       
   660 #endif
       
   661 
       
   662 void
       
   663 gst_collect_pads_stop (GstCollectPads * pads)
       
   664 {
       
   665   GSList *collected;
       
   666 
       
   667   g_return_if_fail (pads != NULL);
       
   668   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
       
   669 
       
   670   GST_DEBUG_OBJECT (pads, "stopping collect pads");
       
   671 
       
   672   /* make sure collect and start cannot be called anymore */
       
   673   GST_OBJECT_LOCK (pads);
       
   674 
       
   675   /* make pads not accept data anymore */
       
   676   GST_COLLECT_PADS_PAD_LOCK (pads);
       
   677   gst_collect_pads_set_flushing_unlocked (pads, TRUE);
       
   678 
       
   679   /* Stop collect pads */
       
   680   pads->started = FALSE;
       
   681   pads->eospads = 0;
       
   682   pads->queuedpads = 0;
       
   683 
       
   684   /* loop over the master pad list and flush buffers */
       
   685   collected = pads->abidata.ABI.pad_list;
       
   686   for (; collected; collected = g_slist_next (collected)) {
       
   687     GstCollectData *data;
       
   688     GstBuffer **buffer_p;
       
   689 
       
   690     data = collected->data;
       
   691     if (data->buffer) {
       
   692       buffer_p = &data->buffer;
       
   693       gst_buffer_replace (buffer_p, NULL);
       
   694       data->pos = 0;
       
   695     }
       
   696     data->abidata.ABI.eos = FALSE;
       
   697   }
       
   698 
       
   699   GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
   700   /* Wake them up so they can end the chain functions. */
       
   701   GST_COLLECT_PADS_BROADCAST (pads);
       
   702 
       
   703   GST_OBJECT_UNLOCK (pads);
       
   704 }
       
   705 
       
   706 /**
       
   707  * gst_collect_pads_peek:
       
   708  * @pads: the collectspads to peek
       
   709  * @data: the data to use
       
   710  *
       
   711  * Peek at the buffer currently queued in @data. This function
       
   712  * should be called with the @pads LOCK held, such as in the callback
       
   713  * handler.
       
   714  *
       
   715  * Returns: The buffer in @data or NULL if no buffer is queued.
       
   716  *  should unref the buffer after usage.
       
   717  *
       
   718  * MT safe.
       
   719  */
       
   720 #ifdef __SYMBIAN32__
       
   721 EXPORT_C
       
   722 #endif
       
   723 
       
   724 GstBuffer *
       
   725 gst_collect_pads_peek (GstCollectPads * pads, GstCollectData * data)
       
   726 {
       
   727   GstBuffer *result;
       
   728 
       
   729   g_return_val_if_fail (pads != NULL, NULL);
       
   730   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
       
   731   g_return_val_if_fail (data != NULL, NULL);
       
   732 
       
   733   if ((result = data->buffer))
       
   734     gst_buffer_ref (result);
       
   735 
       
   736   GST_DEBUG ("Peeking at pad %s:%s: buffer=%p",
       
   737       GST_DEBUG_PAD_NAME (data->pad), result);
       
   738 
       
   739   return result;
       
   740 }
       
   741 
       
   742 /**
       
   743  * gst_collect_pads_pop:
       
   744  * @pads: the collectspads to pop
       
   745  * @data: the data to use
       
   746  *
       
   747  * Pop the buffer currently queued in @data. This function
       
   748  * should be called with the @pads LOCK held, such as in the callback
       
   749  * handler.
       
   750  *
       
   751  * Returns: The buffer in @data or NULL if no buffer was queued.
       
   752  *   You should unref the buffer after usage.
       
   753  *
       
   754  * MT safe.
       
   755  */
       
   756 #ifdef __SYMBIAN32__
       
   757 EXPORT_C
       
   758 #endif
       
   759 
       
   760 GstBuffer *
       
   761 gst_collect_pads_pop (GstCollectPads * pads, GstCollectData * data)
       
   762 {
       
   763   GstBuffer *result;
       
   764 
       
   765   g_return_val_if_fail (pads != NULL, NULL);
       
   766   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
       
   767   g_return_val_if_fail (data != NULL, NULL);
       
   768 
       
   769   if ((result = data->buffer)) {
       
   770     data->buffer = NULL;
       
   771     data->pos = 0;
       
   772     /* one less pad with queued data now */
       
   773     pads->queuedpads--;
       
   774   }
       
   775 
       
   776   GST_COLLECT_PADS_BROADCAST (pads);
       
   777 
       
   778   GST_DEBUG ("Pop buffer on pad %s:%s: buffer=%p",
       
   779       GST_DEBUG_PAD_NAME (data->pad), result);
       
   780 
       
   781   return result;
       
   782 }
       
   783 
       
   784 /* pop and unref the currently queued buffer, should e called with the LOCK
       
   785  * helt. */
       
   786 static void
       
   787 gst_collect_pads_clear (GstCollectPads * pads, GstCollectData * data)
       
   788 {
       
   789   GstBuffer *buf;
       
   790 
       
   791   if ((buf = gst_collect_pads_pop (pads, data)))
       
   792     gst_buffer_unref (buf);
       
   793 }
       
   794 
       
   795 /**
       
   796  * gst_collect_pads_available:
       
   797  * @pads: the collectspads to query
       
   798  *
       
   799  * Query how much bytes can be read from each queued buffer. This means
       
   800  * that the result of this call is the maximum number of bytes that can
       
   801  * be read from each of the pads.
       
   802  *
       
   803  * This function should be called with @pads LOCK held, such as
       
   804  * in the callback.
       
   805  *
       
   806  * Returns: The maximum number of bytes queued on all pads. This function
       
   807  * returns 0 if a pad has no queued buffer.
       
   808  *
       
   809  * MT safe.
       
   810  */
       
   811 /* FIXME, we can do this in the _chain functions */
       
   812 #ifdef __SYMBIAN32__
       
   813 EXPORT_C
       
   814 #endif
       
   815 
       
   816 guint
       
   817 gst_collect_pads_available (GstCollectPads * pads)
       
   818 {
       
   819   GSList *collected;
       
   820   guint result = G_MAXUINT;
       
   821 
       
   822   g_return_val_if_fail (pads != NULL, 0);
       
   823   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
       
   824 
       
   825   for (collected = pads->data; collected; collected = g_slist_next (collected)) {
       
   826     GstCollectData *pdata;
       
   827     GstBuffer *buffer;
       
   828     gint size;
       
   829 
       
   830     pdata = (GstCollectData *) collected->data;
       
   831 
       
   832     /* ignore pad with EOS */
       
   833     if (G_UNLIKELY (pdata->abidata.ABI.eos)) {
       
   834       GST_DEBUG ("pad %p is EOS", pdata);
       
   835       continue;
       
   836     }
       
   837 
       
   838     /* an empty buffer without EOS is weird when we get here.. */
       
   839     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
       
   840       GST_WARNING ("pad %p has no buffer", pdata);
       
   841       goto not_filled;
       
   842     }
       
   843 
       
   844     /* this is the size left of the buffer */
       
   845     size = GST_BUFFER_SIZE (buffer) - pdata->pos;
       
   846     GST_DEBUG ("pad %p has %d bytes left", pdata, size);
       
   847 
       
   848     /* need to return the min of all available data */
       
   849     if (size < result)
       
   850       result = size;
       
   851   }
       
   852   /* nothing changed, all must be EOS then, return 0 */
       
   853   if (G_UNLIKELY (result == G_MAXUINT))
       
   854     result = 0;
       
   855 
       
   856   return result;
       
   857 
       
   858 not_filled:
       
   859   {
       
   860     return 0;
       
   861   }
       
   862 }
       
   863 
       
   864 /**
       
   865  * gst_collect_pads_read:
       
   866  * @pads: the collectspads to query
       
   867  * @data: the data to use
       
   868  * @bytes: a pointer to a byte array
       
   869  * @size: the number of bytes to read
       
   870  *
       
   871  * Get a pointer in @bytes where @size bytes can be read from the
       
   872  * given pad @data.
       
   873  *
       
   874  * This function should be called with @pads LOCK held, such as
       
   875  * in the callback.
       
   876  *
       
   877  * Returns: The number of bytes available for consumption in the
       
   878  * memory pointed to by @bytes. This can be less than @size and
       
   879  * is 0 if the pad is end-of-stream.
       
   880  *
       
   881  * MT safe.
       
   882  */
       
   883 #ifdef __SYMBIAN32__
       
   884 EXPORT_C
       
   885 #endif
       
   886 
       
   887 guint
       
   888 gst_collect_pads_read (GstCollectPads * pads, GstCollectData * data,
       
   889     guint8 ** bytes, guint size)
       
   890 {
       
   891   guint readsize;
       
   892   GstBuffer *buffer;
       
   893 
       
   894   g_return_val_if_fail (pads != NULL, 0);
       
   895   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
       
   896   g_return_val_if_fail (data != NULL, 0);
       
   897   g_return_val_if_fail (bytes != NULL, 0);
       
   898 
       
   899   /* no buffer, must be EOS */
       
   900   if ((buffer = data->buffer) == NULL)
       
   901     return 0;
       
   902 
       
   903   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
       
   904 
       
   905   *bytes = GST_BUFFER_DATA (buffer) + data->pos;
       
   906 
       
   907   return readsize;
       
   908 }
       
   909 
       
   910 /**
       
   911  * gst_collect_pads_read_buffer:
       
   912  * @pads: the collectspads to query
       
   913  * @data: the data to use
       
   914  * @size: the number of bytes to read
       
   915  *
       
   916  * Get a subbuffer of @size bytes from the given pad @data.
       
   917  *
       
   918  * This function should be called with @pads LOCK held, such as in the callback.
       
   919  *
       
   920  * Since: 0.10.18
       
   921  *
       
   922  * Returns: A sub buffer. The size of the buffer can be less that requested.
       
   923  * A return of NULL signals that the pad is end-of-stream.
       
   924  * Unref the buffer after use.
       
   925  *
       
   926  * MT safe.
       
   927  */
       
   928 #ifdef __SYMBIAN32__
       
   929 EXPORT_C
       
   930 #endif
       
   931 
       
   932 GstBuffer *
       
   933 gst_collect_pads_read_buffer (GstCollectPads * pads, GstCollectData * data,
       
   934     guint size)
       
   935 {
       
   936   guint readsize;
       
   937   GstBuffer *buffer;
       
   938 
       
   939   g_return_val_if_fail (pads != NULL, NULL);
       
   940   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), NULL);
       
   941   g_return_val_if_fail (data != NULL, NULL);
       
   942 
       
   943   /* no buffer, must be EOS */
       
   944   if ((buffer = data->buffer) == NULL)
       
   945     return NULL;
       
   946 
       
   947   readsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
       
   948 
       
   949   return gst_buffer_create_sub (buffer, data->pos, readsize);
       
   950 }
       
   951 
       
   952 /**
       
   953  * gst_collect_pads_take_buffer:
       
   954  * @pads: the collectspads to query
       
   955  * @data: the data to use
       
   956  * @size: the number of bytes to read
       
   957  *
       
   958  * Get a subbuffer of @size bytes from the given pad @data. Flushes the amount
       
   959  * of read bytes.
       
   960  *
       
   961  * This function should be called with @pads LOCK held, such as in the callback.
       
   962  *
       
   963  * Since: 0.10.18
       
   964  *
       
   965  * Returns: A sub buffer. The size of the buffer can be less that requested.
       
   966  * A return of NULL signals that the pad is end-of-stream.
       
   967  * Unref the buffer after use.
       
   968  *
       
   969  * MT safe.
       
   970  */
       
   971 #ifdef __SYMBIAN32__
       
   972 EXPORT_C
       
   973 #endif
       
   974 
       
   975 GstBuffer *
       
   976 gst_collect_pads_take_buffer (GstCollectPads * pads, GstCollectData * data,
       
   977     guint size)
       
   978 {
       
   979   GstBuffer *buffer = gst_collect_pads_read_buffer (pads, data, size);
       
   980 
       
   981   if (buffer) {
       
   982     gst_collect_pads_flush (pads, data, GST_BUFFER_SIZE (buffer));
       
   983   }
       
   984   return buffer;
       
   985 }
       
   986 
       
   987 /**
       
   988  * gst_collect_pads_flush:
       
   989  * @pads: the collectspads to query
       
   990  * @data: the data to use
       
   991  * @size: the number of bytes to flush
       
   992  *
       
   993  * Flush @size bytes from the pad @data.
       
   994  *
       
   995  * This function should be called with @pads LOCK held, such as
       
   996  * in the callback.
       
   997  *
       
   998  * Returns: The number of bytes flushed. This can be less than @size and
       
   999  * is 0 if the pad was end-of-stream.
       
  1000  *
       
  1001  * MT safe.
       
  1002  */
       
  1003 #ifdef __SYMBIAN32__
       
  1004 EXPORT_C
       
  1005 #endif
       
  1006 
       
  1007 guint
       
  1008 gst_collect_pads_flush (GstCollectPads * pads, GstCollectData * data,
       
  1009     guint size)
       
  1010 {
       
  1011   guint flushsize;
       
  1012   GstBuffer *buffer;
       
  1013 
       
  1014   g_return_val_if_fail (pads != NULL, 0);
       
  1015   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), 0);
       
  1016   g_return_val_if_fail (data != NULL, 0);
       
  1017 
       
  1018   /* no buffer, must be EOS */
       
  1019   if ((buffer = data->buffer) == NULL)
       
  1020     return 0;
       
  1021 
       
  1022   /* this is what we can flush at max */
       
  1023   flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
       
  1024 
       
  1025   data->pos += size;
       
  1026 
       
  1027   if (data->pos >= GST_BUFFER_SIZE (buffer))
       
  1028     /* _clear will also reset data->pos to 0 */
       
  1029     gst_collect_pads_clear (pads, data);
       
  1030 
       
  1031   return flushsize;
       
  1032 }
       
  1033 
       
  1034 /* see if pads were added or removed and update our stats. Any pad
       
  1035  * added after releasing the PAD_LOCK will get collected in the next
       
  1036  * round.
       
  1037  *
       
  1038  * We can do a quick check by checking the cookies, that get changed
       
  1039  * whenever the pad list is updated.
       
  1040  *
       
  1041  * Must be called with LOCK.
       
  1042  */
       
  1043 static void
       
  1044 gst_collect_pads_check_pads (GstCollectPads * pads)
       
  1045 {
       
  1046   /* the master list and cookie are protected with the PAD_LOCK */
       
  1047   GST_COLLECT_PADS_PAD_LOCK (pads);
       
  1048   if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) {
       
  1049     GSList *collected;
       
  1050 
       
  1051     /* clear list and stats */
       
  1052     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
       
  1053     g_slist_free (pads->data);
       
  1054     pads->data = NULL;
       
  1055     pads->numpads = 0;
       
  1056     pads->queuedpads = 0;
       
  1057     pads->eospads = 0;
       
  1058 
       
  1059     /* loop over the master pad list */
       
  1060     collected = pads->abidata.ABI.pad_list;
       
  1061     for (; collected; collected = g_slist_next (collected)) {
       
  1062       GstCollectData *data;
       
  1063 
       
  1064       /* update the stats */
       
  1065       pads->numpads++;
       
  1066       data = collected->data;
       
  1067       if (data->buffer)
       
  1068         pads->queuedpads++;
       
  1069       if (data->abidata.ABI.eos)
       
  1070         pads->eospads++;
       
  1071 
       
  1072       /* add to the list of pads to collect */
       
  1073       ref_data (data);
       
  1074       pads->data = g_slist_prepend (pads->data, data);
       
  1075     }
       
  1076     /* and update the cookie */
       
  1077     pads->cookie = pads->abidata.ABI.pad_cookie;
       
  1078   }
       
  1079   GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
  1080 }
       
  1081 
       
  1082 /* checks if all the pads are collected and call the collectfunction
       
  1083  *
       
  1084  * Should be called with LOCK.
       
  1085  *
       
  1086  * Returns: The #GstFlowReturn of collection.
       
  1087  */
       
  1088 static GstFlowReturn
       
  1089 gst_collect_pads_check_collected (GstCollectPads * pads)
       
  1090 {
       
  1091   GstFlowReturn flow_ret = GST_FLOW_OK;
       
  1092 
       
  1093   g_return_val_if_fail (GST_IS_COLLECT_PADS (pads), GST_FLOW_ERROR);
       
  1094   g_return_val_if_fail (pads->func != NULL, GST_FLOW_NOT_SUPPORTED);
       
  1095 
       
  1096   /* check for new pads, update stats etc.. */
       
  1097   gst_collect_pads_check_pads (pads);
       
  1098 
       
  1099   if (G_UNLIKELY (pads->eospads == pads->numpads)) {
       
  1100     /* If all our pads are EOS just collect once to let the element
       
  1101      * do its final EOS handling. */
       
  1102     GST_DEBUG ("All active pads (%d) are EOS, calling %s",
       
  1103         pads->numpads, GST_DEBUG_FUNCPTR_NAME (pads->func));
       
  1104     flow_ret = pads->func (pads, pads->user_data);
       
  1105   } else {
       
  1106     gboolean collected = FALSE;
       
  1107 
       
  1108     /* We call the collected function as long as our condition matches.
       
  1109      * FIXME: should we error out if the collect function did not pop anything ?
       
  1110      * we can get a busy loop here if the element does not pop from the collect
       
  1111      * function
       
  1112      */
       
  1113     while (((pads->queuedpads + pads->eospads) >= pads->numpads)) {
       
  1114       GST_DEBUG ("All active pads (%d + %d >= %d) have data, calling %s",
       
  1115           pads->queuedpads, pads->eospads, pads->numpads,
       
  1116           GST_DEBUG_FUNCPTR_NAME (pads->func));
       
  1117       flow_ret = pads->func (pads, pads->user_data);
       
  1118       collected = TRUE;
       
  1119 
       
  1120       /* break on error */
       
  1121       if (flow_ret != GST_FLOW_OK)
       
  1122         break;
       
  1123       /* Don't keep looping after telling the element EOS or flushing */
       
  1124       if (pads->queuedpads == 0)
       
  1125         break;
       
  1126     }
       
  1127     if (!collected)
       
  1128       GST_DEBUG ("Not all active pads (%d) have data, continuing",
       
  1129           pads->numpads);
       
  1130   }
       
  1131   return flow_ret;
       
  1132 }
       
  1133 
       
  1134 static gboolean
       
  1135 gst_collect_pads_event (GstPad * pad, GstEvent * event)
       
  1136 {
       
  1137   gboolean res;
       
  1138   GstCollectData *data;
       
  1139   GstCollectPads *pads;
       
  1140 
       
  1141   /* some magic to get the managing collect_pads */
       
  1142   GST_OBJECT_LOCK (pad);
       
  1143   data = (GstCollectData *) gst_pad_get_element_private (pad);
       
  1144   if (G_UNLIKELY (data == NULL))
       
  1145     goto pad_removed;
       
  1146   ref_data (data);
       
  1147   GST_OBJECT_UNLOCK (pad);
       
  1148 
       
  1149   res = TRUE;
       
  1150 
       
  1151   pads = data->collect;
       
  1152 
       
  1153   GST_DEBUG ("Got %s event on pad %s:%s", GST_EVENT_TYPE_NAME (event),
       
  1154       GST_DEBUG_PAD_NAME (data->pad));
       
  1155 
       
  1156   switch (GST_EVENT_TYPE (event)) {
       
  1157     case GST_EVENT_FLUSH_START:
       
  1158     {
       
  1159       /* forward event to unblock check_collected */
       
  1160       gst_pad_event_default (pad, event);
       
  1161 
       
  1162       /* now unblock the chain function.
       
  1163        * no cond per pad, so they all unblock, 
       
  1164        * non-flushing block again */
       
  1165       GST_OBJECT_LOCK (pads);
       
  1166       data->abidata.ABI.flushing = TRUE;
       
  1167       gst_collect_pads_clear (pads, data);
       
  1168       GST_OBJECT_UNLOCK (pads);
       
  1169 
       
  1170       /* event already cleaned up by forwarding */
       
  1171       goto done;
       
  1172     }
       
  1173     case GST_EVENT_FLUSH_STOP:
       
  1174     {
       
  1175       /* flush the 1 buffer queue */
       
  1176       GST_OBJECT_LOCK (pads);
       
  1177       data->abidata.ABI.flushing = FALSE;
       
  1178       gst_collect_pads_clear (pads, data);
       
  1179       /* we need new segment info after the flush */
       
  1180       gst_segment_init (&data->segment, GST_FORMAT_UNDEFINED);
       
  1181       data->abidata.ABI.new_segment = FALSE;
       
  1182       /* if the pad was EOS, remove the EOS flag and
       
  1183        * decrement the number of eospads */
       
  1184       if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) {
       
  1185         pads->eospads--;
       
  1186         data->abidata.ABI.eos = FALSE;
       
  1187       }
       
  1188       GST_OBJECT_UNLOCK (pads);
       
  1189 
       
  1190       /* forward event */
       
  1191       goto forward;
       
  1192     }
       
  1193     case GST_EVENT_EOS:
       
  1194     {
       
  1195       GST_OBJECT_LOCK (pads);
       
  1196       /* if the pad was not EOS, make it EOS and so we
       
  1197        * have one more eospad */
       
  1198       if (G_LIKELY (data->abidata.ABI.eos == FALSE)) {
       
  1199         data->abidata.ABI.eos = TRUE;
       
  1200         pads->eospads++;
       
  1201       }
       
  1202       /* check if we need collecting anything, we ignore the
       
  1203        * result. */
       
  1204       gst_collect_pads_check_collected (pads);
       
  1205       GST_OBJECT_UNLOCK (pads);
       
  1206 
       
  1207       /* We eat this event, element should do something
       
  1208        * in the collected callback. */
       
  1209       gst_event_unref (event);
       
  1210       goto done;
       
  1211     }
       
  1212     case GST_EVENT_NEWSEGMENT:
       
  1213     {
       
  1214       gint64 start, stop, time;
       
  1215       gdouble rate, arate;
       
  1216       GstFormat format;
       
  1217       gboolean update;
       
  1218 
       
  1219       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
       
  1220           &start, &stop, &time);
       
  1221 
       
  1222       GST_DEBUG_OBJECT (data->pad, "got newsegment, start %" GST_TIME_FORMAT
       
  1223           ", stop %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
       
  1224           GST_TIME_ARGS (stop));
       
  1225 
       
  1226       gst_segment_set_newsegment_full (&data->segment, update, rate, arate,
       
  1227           format, start, stop, time);
       
  1228 
       
  1229       data->abidata.ABI.new_segment = TRUE;
       
  1230 
       
  1231       /* we must not forward this event since multiple segments will be 
       
  1232        * accumulated and this is certainly not what we want. */
       
  1233       gst_event_unref (event);
       
  1234       /* FIXME: collect-pads based elements need to create their own newsegment
       
  1235        * event (and only one really)
       
  1236        * (a) make the segment part of the GstCollectData structure of each pad,
       
  1237        * so you can just check that once you have a buffer queued on that pad,
       
  1238        * (b) you can override a pad's event function with your own,
       
  1239        * catch the newsegment event and then pass it on to the original
       
  1240        * gstcollectpads event function
       
  1241        * (that's what avimux does for something IIRC)
       
  1242        * see #340060
       
  1243        */
       
  1244       goto done;
       
  1245     }
       
  1246     default:
       
  1247       /* forward other events */
       
  1248       goto forward;
       
  1249   }
       
  1250 
       
  1251 forward:
       
  1252   res = gst_pad_event_default (pad, event);
       
  1253 
       
  1254 done:
       
  1255   unref_data (data);
       
  1256   return res;
       
  1257 
       
  1258   /* ERRORS */
       
  1259 pad_removed:
       
  1260   {
       
  1261     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
       
  1262     GST_OBJECT_UNLOCK (pad);
       
  1263     return FALSE;
       
  1264   }
       
  1265 }
       
  1266 
       
  1267 /* For each buffer we receive we check if our collected condition is reached
       
  1268  * and if so we call the collected function. When this is done we check if
       
  1269  * data has been unqueued. If data is still queued we wait holding the stream
       
  1270  * lock to make sure no EOS event can happen while we are ready to be
       
  1271  * collected 
       
  1272  */
       
  1273 static GstFlowReturn
       
  1274 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
       
  1275 {
       
  1276   GstCollectData *data;
       
  1277   GstCollectPads *pads;
       
  1278   guint64 size;
       
  1279   GstFlowReturn ret;
       
  1280   GstBuffer **buffer_p;
       
  1281 
       
  1282   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
       
  1283 
       
  1284   /* some magic to get the managing collect_pads */
       
  1285   GST_OBJECT_LOCK (pad);
       
  1286   data = (GstCollectData *) gst_pad_get_element_private (pad);
       
  1287   if (G_UNLIKELY (data == NULL))
       
  1288     goto no_data;
       
  1289   ref_data (data);
       
  1290   GST_OBJECT_UNLOCK (pad);
       
  1291 
       
  1292   pads = data->collect;
       
  1293   size = GST_BUFFER_SIZE (buffer);
       
  1294 
       
  1295   GST_OBJECT_LOCK (pads);
       
  1296   /* if not started, bail out */
       
  1297   if (G_UNLIKELY (!pads->started))
       
  1298     goto not_started;
       
  1299   /* check if this pad is flushing */
       
  1300   if (G_UNLIKELY (data->abidata.ABI.flushing))
       
  1301     goto flushing;
       
  1302   /* pad was EOS, we can refuse this data */
       
  1303   if (G_UNLIKELY (data->abidata.ABI.eos))
       
  1304     goto unexpected;
       
  1305 
       
  1306   GST_DEBUG ("Queuing buffer %p for pad %s:%s", buffer,
       
  1307       GST_DEBUG_PAD_NAME (pad));
       
  1308 
       
  1309   /* One more pad has data queued */
       
  1310   pads->queuedpads++;
       
  1311   buffer_p = &data->buffer;
       
  1312   gst_buffer_replace (buffer_p, buffer);
       
  1313 
       
  1314   /* update segment last position if in TIME */
       
  1315   if (G_LIKELY (data->segment.format == GST_FORMAT_TIME)) {
       
  1316     GstClockTime timestamp = GST_BUFFER_TIMESTAMP (buffer);
       
  1317 
       
  1318     if (GST_CLOCK_TIME_IS_VALID (timestamp))
       
  1319       gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
       
  1320   }
       
  1321 
       
  1322   /* While we have data queued on this pad try to collect stuff */
       
  1323   do {
       
  1324     /* Check if our collected condition is matched and call the collected function
       
  1325      * if it is */
       
  1326     ret = gst_collect_pads_check_collected (pads);
       
  1327     /* when an error occurs, we want to report this back to the caller ASAP
       
  1328      * without having to block if the buffer was not popped */
       
  1329     if (G_UNLIKELY (ret != GST_FLOW_OK))
       
  1330       goto error;
       
  1331 
       
  1332     /* data was consumed, we can exit and accept new data */
       
  1333     if (data->buffer == NULL)
       
  1334       break;
       
  1335 
       
  1336     /* Check if we got removed in the mean time, FIXME, this is racy.
       
  1337      * Between this check and the _WAIT, the pad could be removed which will
       
  1338      * makes us hang in the _WAIT. */
       
  1339     GST_OBJECT_LOCK (pad);
       
  1340     if (G_UNLIKELY (gst_pad_get_element_private (pad) == NULL))
       
  1341       goto pad_removed;
       
  1342     GST_OBJECT_UNLOCK (pad);
       
  1343 
       
  1344     GST_DEBUG ("Pad %s:%s has a buffer queued, waiting",
       
  1345         GST_DEBUG_PAD_NAME (pad));
       
  1346 
       
  1347     /* wait to be collected, this must happen from another thread triggered
       
  1348      * by the _chain function of another pad. We release the lock so we
       
  1349      * can get stopped or flushed as well. We can however not get EOS
       
  1350      * because we still hold the STREAM_LOCK. 
       
  1351      */
       
  1352     GST_COLLECT_PADS_WAIT (pads);
       
  1353 
       
  1354     GST_DEBUG ("Pad %s:%s resuming", GST_DEBUG_PAD_NAME (pad));
       
  1355 
       
  1356     /* after a signal, we could be stopped */
       
  1357     if (G_UNLIKELY (!pads->started))
       
  1358       goto not_started;
       
  1359     /* check if this pad is flushing */
       
  1360     if (G_UNLIKELY (data->abidata.ABI.flushing))
       
  1361       goto flushing;
       
  1362   }
       
  1363   while (data->buffer != NULL);
       
  1364 
       
  1365 unlock_done:
       
  1366   GST_OBJECT_UNLOCK (pads);
       
  1367   unref_data (data);
       
  1368   gst_buffer_unref (buffer);
       
  1369   return ret;
       
  1370 
       
  1371 pad_removed:
       
  1372   {
       
  1373     GST_WARNING ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
       
  1374     GST_OBJECT_UNLOCK (pad);
       
  1375     ret = GST_FLOW_NOT_LINKED;
       
  1376     goto unlock_done;
       
  1377   }
       
  1378   /* ERRORS */
       
  1379 no_data:
       
  1380   {
       
  1381     GST_DEBUG ("%s got removed from collectpads", GST_OBJECT_NAME (pad));
       
  1382     GST_OBJECT_UNLOCK (pad);
       
  1383     gst_buffer_unref (buffer);
       
  1384     return GST_FLOW_NOT_LINKED;
       
  1385   }
       
  1386 not_started:
       
  1387   {
       
  1388     GST_DEBUG ("not started");
       
  1389     gst_collect_pads_clear (pads, data);
       
  1390     ret = GST_FLOW_WRONG_STATE;
       
  1391     goto unlock_done;
       
  1392   }
       
  1393 flushing:
       
  1394   {
       
  1395     GST_DEBUG ("pad %s:%s is flushing", GST_DEBUG_PAD_NAME (pad));
       
  1396     gst_collect_pads_clear (pads, data);
       
  1397     ret = GST_FLOW_WRONG_STATE;
       
  1398     goto unlock_done;
       
  1399   }
       
  1400 unexpected:
       
  1401   {
       
  1402     /* we should not post an error for this, just inform upstream that
       
  1403      * we don't expect anything anymore */
       
  1404     GST_DEBUG ("pad %s:%s is eos", GST_DEBUG_PAD_NAME (pad));
       
  1405     ret = GST_FLOW_UNEXPECTED;
       
  1406     goto unlock_done;
       
  1407   }
       
  1408 error:
       
  1409   {
       
  1410     /* we print the error, the element should post a reasonable error
       
  1411      * message for fatal errors */
       
  1412     GST_DEBUG ("collect failed, reason %d (%s)", ret, gst_flow_get_name (ret));
       
  1413     gst_collect_pads_clear (pads, data);
       
  1414     goto unlock_done;
       
  1415   }
       
  1416 }