gstreamer_core/libs/gst/base/gstcollectpads.c
branchRCL_3
changeset 30 7e817e7e631c
parent 29 567bb019e3e3
equal deleted inserted replaced
29:567bb019e3e3 30:7e817e7e631c
    93 static void gst_collect_pads_finalize (GObject * object);
    93 static void gst_collect_pads_finalize (GObject * object);
    94 static void gst_collect_pads_init (GstCollectPads * pads,
    94 static void gst_collect_pads_init (GstCollectPads * pads,
    95     GstCollectPadsClass * g_class);
    95     GstCollectPadsClass * g_class);
    96 static void ref_data (GstCollectData * data);
    96 static void ref_data (GstCollectData * data);
    97 static void unref_data (GstCollectData * data);
    97 static void unref_data (GstCollectData * data);
    98 static void gst_collect_pads_check_pads_unlocked (GstCollectPads * pads);
       
    99 
    98 
   100 static void
    99 static void
   101 gst_collect_pads_base_init (gpointer g_class)
   100 gst_collect_pads_base_init (gpointer g_class)
   102 {
   101 {
   103   /* Do nothing here */
   102   /* Do nothing here */
   544   g_warning ("gst_collect_pads_collect_range() is not implemented");
   543   g_warning ("gst_collect_pads_collect_range() is not implemented");
   545 
   544 
   546   return GST_FLOW_NOT_SUPPORTED;
   545   return GST_FLOW_NOT_SUPPORTED;
   547 }
   546 }
   548 
   547 
   549 static gboolean
       
   550 gst_collect_pads_is_flushing (GstCollectPads * pads)
       
   551 {
       
   552   GSList *walk = NULL;
       
   553   gboolean res = TRUE;
       
   554 
       
   555   GST_COLLECT_PADS_PAD_LOCK (pads);
       
   556 
       
   557   /* Ensure pads->data state */
       
   558   gst_collect_pads_check_pads_unlocked (pads);
       
   559 
       
   560   GST_DEBUG ("Getting flushing state (pads:%p, pads->data:%p)",
       
   561       pads, pads->data);
       
   562 
       
   563   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
       
   564     GstCollectData *cdata = walk->data;
       
   565 
       
   566     GST_DEBUG_OBJECT (cdata->pad, "flushing:%d", cdata->abidata.ABI.flushing);
       
   567 
       
   568     if (cdata->abidata.ABI.flushing) {
       
   569       goto done;
       
   570     }
       
   571   }
       
   572 
       
   573   res = FALSE;
       
   574 done:
       
   575   GST_COLLECT_PADS_PAD_UNLOCK (pads);
       
   576   return res;
       
   577 }
       
   578 
       
   579 /* FIXME, I think this function is used to work around bad behaviour
   548 /* FIXME, I think this function is used to work around bad behaviour
   580  * of elements that add pads to themselves without activating them.
   549  * of elements that add pads to themselves without activating them.
   581  *
   550  *
   582  * Must be called with PAD_LOCK.
   551  * Must be called with PAD_LOCK.
   583  */
   552  */
   584 static void
   553 static void
   585 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
   554 gst_collect_pads_set_flushing_unlocked (GstCollectPads * pads,
   586     gboolean flushing)
   555     gboolean flushing)
   587 {
   556 {
   588   GSList *walk = NULL;
   557   GSList *walk = NULL;
   589 
       
   590   GST_DEBUG ("Setting flushing (%d)", flushing);
       
   591 
   558 
   592   /* Update the pads flushing flag */
   559   /* Update the pads flushing flag */
   593   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
   560   for (walk = pads->data; walk; walk = g_slist_next (walk)) {
   594     GstCollectData *cdata = walk->data;
   561     GstCollectData *cdata = walk->data;
   595 
   562 
   602       cdata->abidata.ABI.flushing = flushing;
   569       cdata->abidata.ABI.flushing = flushing;
   603       gst_collect_pads_clear (pads, cdata);
   570       gst_collect_pads_clear (pads, cdata);
   604       GST_OBJECT_UNLOCK (cdata->pad);
   571       GST_OBJECT_UNLOCK (cdata->pad);
   605     }
   572     }
   606   }
   573   }
   607   /* Setting the pads to flushing means that we changed the values which
       
   608    * are 'protected' by the cookie. We therefore update it to force a 
       
   609    * recalculation of the current pad status. */
       
   610   pads->abidata.ABI.pad_cookie++;
       
   611 }
   574 }
   612 
   575 
   613 /**
   576 /**
   614  * gst_collect_pads_set_flushing:
   577  * gst_collect_pads_set_flushing:
   615  * @pads: the collectspads to use
   578  * @pads: the collectspads to use
   632 {
   595 {
   633   g_return_if_fail (pads != NULL);
   596   g_return_if_fail (pads != NULL);
   634   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
   597   g_return_if_fail (GST_IS_COLLECT_PADS (pads));
   635 
   598 
   636   GST_COLLECT_PADS_PAD_LOCK (pads);
   599   GST_COLLECT_PADS_PAD_LOCK (pads);
   637   /* Ensure pads->data state */
       
   638   gst_collect_pads_check_pads_unlocked (pads);
       
   639   gst_collect_pads_set_flushing_unlocked (pads, flushing);
   600   gst_collect_pads_set_flushing_unlocked (pads, flushing);
   640   GST_COLLECT_PADS_PAD_UNLOCK (pads);
   601   GST_COLLECT_PADS_PAD_UNLOCK (pads);
   641 }
   602 }
   642 
   603 
   643 /**
   604 /**
   868 
   829 
   869     pdata = (GstCollectData *) collected->data;
   830     pdata = (GstCollectData *) collected->data;
   870 
   831 
   871     /* ignore pad with EOS */
   832     /* ignore pad with EOS */
   872     if (G_UNLIKELY (pdata->abidata.ABI.eos)) {
   833     if (G_UNLIKELY (pdata->abidata.ABI.eos)) {
   873       GST_DEBUG ("pad %s:%s is EOS", GST_DEBUG_PAD_NAME (pdata->pad));
   834       GST_DEBUG ("pad %p is EOS", pdata);
   874       continue;
   835       continue;
   875     }
   836     }
   876 
   837 
   877     /* an empty buffer without EOS is weird when we get here.. */
   838     /* an empty buffer without EOS is weird when we get here.. */
   878     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
   839     if (G_UNLIKELY ((buffer = pdata->buffer) == NULL)) {
   879       GST_WARNING ("pad %s:%s has no buffer", GST_DEBUG_PAD_NAME (pdata->pad));
   840       GST_WARNING ("pad %p has no buffer", pdata);
   880       goto not_filled;
   841       goto not_filled;
   881     }
   842     }
   882 
   843 
   883     /* this is the size left of the buffer */
   844     /* this is the size left of the buffer */
   884     size = GST_BUFFER_SIZE (buffer) - pdata->pos;
   845     size = GST_BUFFER_SIZE (buffer) - pdata->pos;
   885     GST_DEBUG ("pad %s:%s has %d bytes left",
   846     GST_DEBUG ("pad %p has %d bytes left", pdata, size);
   886         GST_DEBUG_PAD_NAME (pdata->pad), size);
       
   887 
   847 
   888     /* need to return the min of all available data */
   848     /* need to return the min of all available data */
   889     if (size < result)
   849     if (size < result)
   890       result = size;
   850       result = size;
   891   }
   851   }
  1062   /* this is what we can flush at max */
  1022   /* this is what we can flush at max */
  1063   flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
  1023   flushsize = MIN (size, GST_BUFFER_SIZE (buffer) - data->pos);
  1064 
  1024 
  1065   data->pos += size;
  1025   data->pos += size;
  1066 
  1026 
  1067   GST_LOG_OBJECT (pads, "Flushing %d bytes, requested %u", flushsize, size);
       
  1068 
       
  1069   if (data->pos >= GST_BUFFER_SIZE (buffer))
  1027   if (data->pos >= GST_BUFFER_SIZE (buffer))
  1070     /* _clear will also reset data->pos to 0 */
  1028     /* _clear will also reset data->pos to 0 */
  1071     gst_collect_pads_clear (pads, data);
  1029     gst_collect_pads_clear (pads, data);
  1072 
  1030 
  1073   return flushsize;
  1031   return flushsize;
  1081  * whenever the pad list is updated.
  1039  * whenever the pad list is updated.
  1082  *
  1040  *
  1083  * Must be called with LOCK.
  1041  * Must be called with LOCK.
  1084  */
  1042  */
  1085 static void
  1043 static void
  1086 gst_collect_pads_check_pads_unlocked (GstCollectPads * pads)
  1044 gst_collect_pads_check_pads (GstCollectPads * pads)
  1087 {
  1045 {
  1088   GST_DEBUG ("stored cookie : %d, used_cookie:%d",
  1046   /* the master list and cookie are protected with the PAD_LOCK */
  1089       pads->abidata.ABI.pad_cookie, pads->cookie);
  1047   GST_COLLECT_PADS_PAD_LOCK (pads);
  1090   if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) {
  1048   if (G_UNLIKELY (pads->abidata.ABI.pad_cookie != pads->cookie)) {
  1091     GSList *collected;
  1049     GSList *collected;
  1092 
  1050 
  1093     /* clear list and stats */
  1051     /* clear list and stats */
  1094     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
  1052     g_slist_foreach (pads->data, (GFunc) unref_data, NULL);
  1104       GstCollectData *data;
  1062       GstCollectData *data;
  1105 
  1063 
  1106       /* update the stats */
  1064       /* update the stats */
  1107       pads->numpads++;
  1065       pads->numpads++;
  1108       data = collected->data;
  1066       data = collected->data;
  1109 
  1067       if (data->buffer)
  1110       if (G_LIKELY (!data->abidata.ABI.flushing)) {
  1068         pads->queuedpads++;
  1111         if (data->buffer)
  1069       if (data->abidata.ABI.eos)
  1112           pads->queuedpads++;
  1070         pads->eospads++;
  1113         if (data->abidata.ABI.eos)
       
  1114           pads->eospads++;
       
  1115       }
       
  1116 
  1071 
  1117       /* add to the list of pads to collect */
  1072       /* add to the list of pads to collect */
  1118       ref_data (data);
  1073       ref_data (data);
  1119       pads->data = g_slist_prepend (pads->data, data);
  1074       pads->data = g_slist_prepend (pads->data, data);
  1120     }
  1075     }
  1121     /* and update the cookie */
  1076     /* and update the cookie */
  1122     pads->cookie = pads->abidata.ABI.pad_cookie;
  1077     pads->cookie = pads->abidata.ABI.pad_cookie;
  1123   }
  1078   }
  1124 }
       
  1125 
       
  1126 static inline void
       
  1127 gst_collect_pads_check_pads (GstCollectPads * pads)
       
  1128 {
       
  1129   /* the master list and cookie are protected with the PAD_LOCK */
       
  1130   GST_COLLECT_PADS_PAD_LOCK (pads);
       
  1131   gst_collect_pads_check_pads_unlocked (pads);
       
  1132   GST_COLLECT_PADS_PAD_UNLOCK (pads);
  1079   GST_COLLECT_PADS_PAD_UNLOCK (pads);
  1133 }
  1080 }
  1134 
  1081 
  1135 /* checks if all the pads are collected and call the collectfunction
  1082 /* checks if all the pads are collected and call the collectfunction
  1136  *
  1083  *
  1236        * decrement the number of eospads */
  1183        * decrement the number of eospads */
  1237       if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) {
  1184       if (G_UNLIKELY (data->abidata.ABI.eos == TRUE)) {
  1238         pads->eospads--;
  1185         pads->eospads--;
  1239         data->abidata.ABI.eos = FALSE;
  1186         data->abidata.ABI.eos = FALSE;
  1240       }
  1187       }
  1241 
       
  1242       if (!gst_collect_pads_is_flushing (pads)) {
       
  1243         /* forward event if all pads are no longer flushing */
       
  1244         GST_DEBUG ("No more pads are flushing, forwarding FLUSH_STOP");
       
  1245         GST_OBJECT_UNLOCK (pads);
       
  1246         goto forward;
       
  1247       }
       
  1248       gst_event_unref (event);
       
  1249       GST_OBJECT_UNLOCK (pads);
  1188       GST_OBJECT_UNLOCK (pads);
  1250       goto done;
  1189 
       
  1190       /* forward event */
       
  1191       goto forward;
  1251     }
  1192     }
  1252     case GST_EVENT_EOS:
  1193     case GST_EVENT_EOS:
  1253     {
  1194     {
  1254       GST_OBJECT_LOCK (pads);
  1195       GST_OBJECT_LOCK (pads);
  1255       /* if the pad was not EOS, make it EOS and so we
  1196       /* if the pad was not EOS, make it EOS and so we
  1306       /* forward other events */
  1247       /* forward other events */
  1307       goto forward;
  1248       goto forward;
  1308   }
  1249   }
  1309 
  1250 
  1310 forward:
  1251 forward:
  1311   GST_DEBUG_OBJECT (pads, "forward unhandled event: %s",
       
  1312       GST_EVENT_TYPE_NAME (event));
       
  1313   res = gst_pad_event_default (pad, event);
  1252   res = gst_pad_event_default (pad, event);
  1314 
  1253 
  1315 done:
  1254 done:
  1316   unref_data (data);
  1255   unref_data (data);
  1317   return res;
  1256   return res;
  1334 static GstFlowReturn
  1273 static GstFlowReturn
  1335 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
  1274 gst_collect_pads_chain (GstPad * pad, GstBuffer * buffer)
  1336 {
  1275 {
  1337   GstCollectData *data;
  1276   GstCollectData *data;
  1338   GstCollectPads *pads;
  1277   GstCollectPads *pads;
       
  1278   guint64 size;
  1339   GstFlowReturn ret;
  1279   GstFlowReturn ret;
  1340   GstBuffer **buffer_p;
  1280   GstBuffer **buffer_p;
  1341 
  1281 
  1342   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
  1282   GST_DEBUG ("Got buffer for pad %s:%s", GST_DEBUG_PAD_NAME (pad));
  1343 
  1283 
  1348     goto no_data;
  1288     goto no_data;
  1349   ref_data (data);
  1289   ref_data (data);
  1350   GST_OBJECT_UNLOCK (pad);
  1290   GST_OBJECT_UNLOCK (pad);
  1351 
  1291 
  1352   pads = data->collect;
  1292   pads = data->collect;
       
  1293   size = GST_BUFFER_SIZE (buffer);
  1353 
  1294 
  1354   GST_OBJECT_LOCK (pads);
  1295   GST_OBJECT_LOCK (pads);
  1355   /* if not started, bail out */
  1296   /* if not started, bail out */
  1356   if (G_UNLIKELY (!pads->started))
  1297   if (G_UNLIKELY (!pads->started))
  1357     goto not_started;
  1298     goto not_started;
  1378       gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
  1319       gst_segment_set_last_stop (&data->segment, GST_FORMAT_TIME, timestamp);
  1379   }
  1320   }
  1380 
  1321 
  1381   /* While we have data queued on this pad try to collect stuff */
  1322   /* While we have data queued on this pad try to collect stuff */
  1382   do {
  1323   do {
  1383     GST_DEBUG ("Pad %s:%s checking", GST_DEBUG_PAD_NAME (pad));
       
  1384     /* Check if our collected condition is matched and call the collected function
  1324     /* Check if our collected condition is matched and call the collected function
  1385      * if it is */
  1325      * if it is */
  1386     ret = gst_collect_pads_check_collected (pads);
  1326     ret = gst_collect_pads_check_collected (pads);
  1387     /* when an error occurs, we want to report this back to the caller ASAP
  1327     /* when an error occurs, we want to report this back to the caller ASAP
  1388      * without having to block if the buffer was not popped */
  1328      * without having to block if the buffer was not popped */
  1421       goto flushing;
  1361       goto flushing;
  1422   }
  1362   }
  1423   while (data->buffer != NULL);
  1363   while (data->buffer != NULL);
  1424 
  1364 
  1425 unlock_done:
  1365 unlock_done:
  1426   GST_DEBUG ("Pad %s:%s done", GST_DEBUG_PAD_NAME (pad));
       
  1427   GST_OBJECT_UNLOCK (pads);
  1366   GST_OBJECT_UNLOCK (pads);
  1428   unref_data (data);
  1367   unref_data (data);
  1429   gst_buffer_unref (buffer);
  1368   gst_buffer_unref (buffer);
  1430   return ret;
  1369   return ret;
  1431 
  1370