gstreamer_core/libs/gst/base/gstbasesrc.c
branchRCL_3
changeset 30 7e817e7e631c
parent 29 567bb019e3e3
--- a/gstreamer_core/libs/gst/base/gstbasesrc.c	Tue Aug 31 15:30:33 2010 +0300
+++ b/gstreamer_core/libs/gst/base/gstbasesrc.c	Wed Sep 01 12:16:41 2010 +0100
@@ -33,11 +33,14 @@
  *   <listitem><para>live sources</para></listitem>
  * </itemizedlist>
  *
+ * <refsect2>
+ * <para>
  * The source can be configured to operate in any #GstFormat with the
  * gst_base_src_set_format() method. The currently set format determines 
  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT 
  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
- *
+ * </para>
+ * <para>
  * #GstBaseSrc always supports push mode scheduling. If the following
  * conditions are met, it also supports pull mode scheduling:
  * <itemizedlist>
@@ -46,10 +49,12 @@
  *   <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
  *   </listitem>
  * </itemizedlist>
- * 
+ * </para>
+ * <para>
  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any 
  * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. 
- * 
+ * </para>
+ * <para>
  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
  * automatically seekable in push mode as well. The following conditions must 
  * be met to make the element seekable in push mode when the format is not
@@ -66,34 +71,40 @@
  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
  *   </para></listitem>
  * </itemizedlist>
- * 
+ * </para>
+ * <para>
  * When the element does not meet the requirements to operate in pull mode,
  * the offset and length in the #GstBaseSrc::create method should be ignored.
  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
  * element can operate in pull mode but only with specific offsets and
  * lengths, it is allowed to generate an error when the wrong values are passed
  * to the #GstBaseSrc::create function.
- * 
+ * </para>
+ * <para>
  * #GstBaseSrc has support for live sources. Live sources are sources that when
  * paused discard data, such as audio or video capture devices. A typical live
  * source also produces data at a fixed rate and thus provides a clock to publish
  * this rate.
  * Use gst_base_src_set_live() to activate the live source mode.
- * 
+ * </para>
+ * <para>
  * A live source does not produce data in the PAUSED state. This means that the 
  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
  * To signal the pipeline that the element will not produce data, the return
  * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
- * 
+ * </para>
+ * <para>
  * A typical live source will timestamp the buffers it creates with the 
  * current running time of the pipeline. This is one reason why a live source
  * can only produce data in the PLAYING state, when the clock is actually 
  * distributed and running. 
- * 
+ * </para>
+ * <para>
  * Live sources that synchronize and block on the clock (an audio source, for
  * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
  * function was interrupted by a state change to PAUSED.
- * 
+ * </para>
+ * <para>
  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
  * sources.
  * It only makes sense to implement the ::get_times function if the source is 
@@ -102,17 +113,21 @@
  * the timestamps are transformed into the current running_time.
  * The base source will then wait for the calculated running_time before pushing
  * out the buffer.
- * 
+ * </para>
+ * <para>
  * For live sources, the base class will by default report a latency of 0.
  * For pseudo live sources, the base class will by default measure the difference
  * between the first buffer timestamp and the start time of get_times and will
  * report this value as the latency. 
  * Subclasses should override the query function when this behaviour is not
  * acceptable.
- * 
+ * </para>
+ * <para>
  * There is only support in #GstBaseSrc for exactly one source pad, which 
  * should be named "src". A source implementation (subclass of #GstBaseSrc) 
  * should install a pad template in its class_init function, like so:
+ * </para>
+ * <para>
  * <programlisting>
  * static void
  * my_element_class_init (GstMyElementClass *klass)
@@ -126,8 +141,7 @@
  *   gst_element_class_set_details (gstelement_class, &amp;details);
  * }
  * </programlisting>
- *
- * <refsect2>
+ * </para>
  * <title>Controlled shutdown of live sources in applications</title>
  * <para>
  * Applications that record from a live source may want to stop recording
@@ -138,19 +152,22 @@
  * event down the pipeline. The application would then wait for an
  * EOS message posted on the pipeline's bus to know when all data has
  * been processed and the pipeline can safely be stopped.
- * 
+ * </para>
+ * <para>
  * Since GStreamer 0.10.16 an application may send an EOS event to a source
- * element to make it perform the EOS logic (send EOS event downstream or post a
- * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
+ * element to make it send an EOS event downstream. This can typically be done
  * with the gst_element_send_event() function on the element or its parent bin.
- * 
+ * </para>
+ * <para>
  * After the EOS has been sent to the element, the application should wait for
  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
  * received, it may safely shut down the entire pipeline.
- * 
+ * </para>
+ * <para>
  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
  * is still available but deprecated as it is dangerous and less flexible.
- * 
+ * </para>
+ * <para>
  * Last reviewed on 2007-12-19 (0.10.16)
  * </para>
  * </refsect2>
@@ -226,8 +243,8 @@
   GstEvent *close_segment;
   GstEvent *start_segment;
 
-  /* if EOS is pending (atomic) */
-  gint pending_eos;
+  /* if EOS is pending */
+  gboolean pending_eos;
 
   /* startup latency is the time it takes between going to PLAYING and producing
    * the first BUFFER with running_time 0. This value is included in the latency
@@ -238,12 +255,6 @@
   GstClockTimeDiff ts_offset;
 
   gboolean do_timestamp;
-
-  /* stream sequence number */
-  guint32 seqnum;
-
-  /* pending tags to be pushed in the data stream */
-  GList *pending_tags;
 };
 
 static GstElementClass *parent_class = NULL;
@@ -261,10 +272,9 @@
 GType
 gst_base_src_get_type (void)
 {
-  static volatile gsize base_src_type = 0;
-
-  if (g_once_init_enter (&base_src_type)) {
-    GType _type;
+  static GType base_src_type = 0;
+
+  if (G_UNLIKELY (base_src_type == 0)) {
     static const GTypeInfo base_src_info = {
       sizeof (GstBaseSrcClass),
       (GBaseInitFunc) gst_base_src_base_init,
@@ -277,13 +287,11 @@
       (GInstanceInitFunc) gst_base_src_init,
     };
 
-    _type = g_type_register_static (GST_TYPE_ELEMENT,
+    base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
-    g_once_init_leave (&base_src_type, _type);
   }
   return base_src_type;
 }
-
 static GstCaps *gst_base_src_getcaps (GstPad * pad);
 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
@@ -323,7 +331,6 @@
     guint length, GstBuffer ** buf);
 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
     guint length, GstBuffer ** buf);
-static gboolean gst_base_src_seekable (GstBaseSrc * src);
 
 static void
 gst_base_src_base_init (gpointer g_class)
@@ -350,21 +357,20 @@
 
   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
       g_param_spec_ulong ("blocksize", "Block size",
-          "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
-          DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
+          DEFAULT_BLOCKSIZE, G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
       g_param_spec_int ("num-buffers", "num-buffers",
-          "Number of buffers to output before sending EOS (-1 = unlimited)",
-          -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
-          G_PARAM_STATIC_STRINGS));
+          "Number of buffers to output before sending EOS", -1, G_MAXINT,
+          DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
       g_param_spec_boolean ("typefind", "Typefind",
           "Run typefind before negotiating", DEFAULT_TYPEFIND,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE));
   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
       g_param_spec_boolean ("do-timestamp", "Do timestamp",
           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
-          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+          G_PARAM_READWRITE));
 
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
@@ -454,11 +460,6 @@
   event_p = &basesrc->data.ABI.pending_seek;
   gst_event_replace (event_p, NULL);
 
-  if (basesrc->priv->pending_tags) {
-    g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL);
-    g_list_free (basesrc->priv->pending_tags);
-  }
-
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
@@ -487,8 +488,6 @@
 GstFlowReturn
 gst_base_src_wait_playing (GstBaseSrc * src)
 {
-  g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
-
   /* block until the state changes, or we get a flush, or something */
   GST_DEBUG_OBJECT (src, "live source waiting for running state");
   GST_LIVE_WAIT (src);
@@ -527,8 +526,6 @@
 void
 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
 {
-  g_return_if_fail (GST_IS_BASE_SRC (src));
-
   GST_OBJECT_LOCK (src);
   src->is_live = live;
   GST_OBJECT_UNLOCK (src);
@@ -551,8 +548,6 @@
 {
   gboolean result;
 
-  g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
-
   GST_OBJECT_LOCK (src);
   result = src->is_live;
   GST_OBJECT_UNLOCK (src);
@@ -580,8 +575,6 @@
 void
 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
 {
-  g_return_if_fail (GST_IS_BASE_SRC (src));
-
   gst_segment_init (&src->segment, format);
 }
 
@@ -613,8 +606,6 @@
 {
   GstClockTime min;
 
-  g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
-
   GST_OBJECT_LOCK (src);
   if (live)
     *live = src->is_live;
@@ -641,59 +632,6 @@
 }
 
 /**
- * gst_base_src_set_blocksize:
- * @src: the source
- * @blocksize: the new blocksize in bytes
- *
- * Set the number of bytes that @src will push out with each buffer. When
- * @blocksize is set to -1, a default length will be used.
- *
- * Since: 0.10.22
- */
-#ifdef __SYMBIAN32__
-EXPORT_C
-#endif
-
-void
-gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
-{
-  g_return_if_fail (GST_IS_BASE_SRC (src));
-
-  GST_OBJECT_LOCK (src);
-  src->blocksize = blocksize;
-  GST_OBJECT_UNLOCK (src);
-}
-
-/**
- * gst_base_src_get_blocksize:
- * @src: the source
- *
- * Get the number of bytes that @src will push out with each buffer.
- *
- * Returns: the number of bytes pushed with each buffer.
- *
- * Since: 0.10.22
- */
-#ifdef __SYMBIAN32__
-EXPORT_C
-#endif
-
-gulong
-gst_base_src_get_blocksize (GstBaseSrc * src)
-{
-  gulong res;
-
-  g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
-
-  GST_OBJECT_LOCK (src);
-  res = src->blocksize;
-  GST_OBJECT_UNLOCK (src);
-
-  return res;
-}
-
-
-/**
  * gst_base_src_set_do_timestamp:
  * @src: the source
  * @timestamp: enable or disable timestamping
@@ -711,8 +649,6 @@
 void
 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
 {
-  g_return_if_fail (GST_IS_BASE_SRC (src));
-
   GST_OBJECT_LOCK (src);
   src->priv->do_timestamp = timestamp;
   GST_OBJECT_UNLOCK (src);
@@ -737,8 +673,6 @@
 {
   gboolean res;
 
-  g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
-
   GST_OBJECT_LOCK (src);
   res = src->priv->do_timestamp;
   GST_OBJECT_UNLOCK (src);
@@ -863,7 +797,6 @@
 
       GST_DEBUG_OBJECT (src, "duration query in format %s",
           gst_format_get_name (format));
-
       switch (format) {
         case GST_FORMAT_PERCENT:
           gst_query_set_duration (query, GST_FORMAT_PERCENT,
@@ -899,20 +832,9 @@
 
     case GST_QUERY_SEEKING:
     {
-      GstFormat format;
-
-      gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
-      if (format == src->segment.format) {
-        gst_query_set_seeking (query, src->segment.format,
-            gst_base_src_seekable (src), 0, src->segment.duration);
-        res = TRUE;
-      } else {
-        /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
-        /* Don't reply to the query to make up for demuxers which don't
-         * handle the SEEKING query yet. Players like Totem will fall back
-         * to the duration when the SEEKING query isn't answered. */
-        res = FALSE;
-      }
+      gst_query_set_seeking (query, src->segment.format,
+          src->seekable, 0, src->segment.duration);
+      res = TRUE;
       break;
     }
     case GST_QUERY_SEGMENT:
@@ -977,46 +899,6 @@
     }
     case GST_QUERY_JITTER:
     case GST_QUERY_RATE:
-      res = FALSE;
-      break;
-    case GST_QUERY_BUFFERING:
-    {
-      GstFormat format;
-      gint64 start, stop, estimated;
-
-      gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
-
-      GST_DEBUG_OBJECT (src, "buffering query in format %s",
-          gst_format_get_name (format));
-
-      if (src->random_access) {
-        estimated = 0;
-        start = 0;
-        if (format == GST_FORMAT_PERCENT)
-          stop = GST_FORMAT_PERCENT_MAX;
-        else
-          stop = src->segment.duration;
-      } else {
-        estimated = -1;
-        start = -1;
-        stop = -1;
-      }
-      /* convert to required format. When the conversion fails, we can't answer
-       * the query. When the value is unknown, we can don't perform conversion
-       * but report TRUE. */
-      if (format != GST_FORMAT_PERCENT && stop != -1) {
-        res = gst_pad_query_convert (src->srcpad, src->segment.format,
-            stop, &format, &stop);
-      } else {
-        res = TRUE;
-      }
-      if (res && format != GST_FORMAT_PERCENT && start != -1)
-        res = gst_pad_query_convert (src->srcpad, src->segment.format,
-            start, &format, &start);
-
-      gst_query_set_buffering_range (query, format, start, stop, estimated);
-      break;
-    }
     default:
       res = FALSE;
       break;
@@ -1058,10 +940,9 @@
   } else if (segment->start == 0) {
     /* seek to start, we can implement a default for this. */
     segment->time = 0;
-  } else {
+    res = TRUE;
+  } else
     res = FALSE;
-    GST_INFO_OBJECT (src, "Can't do a default seek");
-  }
 
   return res;
 }
@@ -1215,7 +1096,7 @@
 static gboolean
 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
 {
-  gboolean res = TRUE, tres;
+  gboolean res = TRUE;
   gdouble rate;
   GstFormat seek_format, dest_format;
   GstSeekFlags flags;
@@ -1226,8 +1107,6 @@
   gboolean relative_seek = FALSE;
   gboolean seekseg_configured = FALSE;
   GstSegment seeksegment;
-  guint32 seqnum;
-  GstEvent *tevent;
 
   GST_DEBUG_OBJECT (src, "doing seek");
 
@@ -1243,8 +1122,7 @@
     if (dest_format != seek_format && !relative_seek) {
       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
        * here before taking the stream lock, otherwise we must convert it later,
-       * once we have the stream lock and can read the last configures segment
-       * start and stop positions */
+       * once we have the stream lock and can read the current position */
       gst_segment_init (&seeksegment, dest_format);
 
       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
@@ -1254,19 +1132,14 @@
     }
 
     flush = flags & GST_SEEK_FLAG_FLUSH;
-    seqnum = gst_event_get_seqnum (event);
   } else {
     flush = FALSE;
-    /* get next seqnum */
-    seqnum = gst_util_seqnum_next ();
   }
 
   /* send flush start */
-  if (flush) {
-    tevent = gst_event_new_flush_start ();
-    gst_event_set_seqnum (tevent, seqnum);
-    gst_pad_push_event (src->srcpad, tevent);
-  } else
+  if (flush)
+    gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
+  else
     gst_pad_pause_task (src->srcpad);
 
   /* unblock streaming thread. */
@@ -1276,14 +1149,6 @@
    * because the task is paused, our streaming thread stopped 
    * or because our peer is flushing. */
   GST_PAD_STREAM_LOCK (src->srcpad);
-  if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
-    /* we have seen this event before, issue a warning for now */
-    GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
-        seqnum);
-  } else {
-    src->priv->seqnum = seqnum;
-    GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
-  }
 
   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
 
@@ -1326,11 +1191,9 @@
 
   /* and prepare to continue streaming */
   if (flush) {
-    tevent = gst_event_new_flush_stop ();
-    gst_event_set_seqnum (tevent, seqnum);
     /* send flush stop, peer will accept data and events again. We
      * are not yet providing data as we still have the STREAM_LOCK. */
-    gst_pad_push_event (src->srcpad, tevent);
+    gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
   } else if (res && src->data.ABI.running) {
     /* we are running the current segment and doing a non-flushing seek, 
      * close the segment first based on the last_stop. */
@@ -1344,7 +1207,6 @@
         gst_event_new_new_segment_full (TRUE,
         src->segment.rate, src->segment.applied_rate, src->segment.format,
         src->segment.start, src->segment.last_stop, src->segment.time);
-    gst_event_set_seqnum (src->priv->close_segment, seqnum);
   }
 
   /* The subclass must have converted the segment to the processing format 
@@ -1361,13 +1223,9 @@
     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
 
     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
-      GstMessage *message;
-
-      message = gst_message_new_segment_start (GST_OBJECT (src),
-          src->segment.format, src->segment.last_stop);
-      gst_message_set_seqnum (message, seqnum);
-
-      gst_element_post_message (GST_ELEMENT (src), message);
+      gst_element_post_message (GST_ELEMENT (src),
+          gst_message_new_segment_start (GST_OBJECT (src),
+              src->segment.format, src->segment.last_stop));
     }
 
     /* for deriving a stop position for the playback segment from the seek
@@ -1389,23 +1247,20 @@
           src->segment.rate, src->segment.applied_rate, src->segment.format,
           src->segment.last_stop, stop, src->segment.time);
     } else {
-      /* reverse, we send data from last_stop to start */
+      /* reverse, we send data from stop to last_stop */
       src->priv->start_segment =
           gst_event_new_new_segment_full (FALSE,
           src->segment.rate, src->segment.applied_rate, src->segment.format,
           src->segment.start, src->segment.last_stop, src->segment.time);
     }
-    gst_event_set_seqnum (src->priv->start_segment, seqnum);
   }
 
   src->priv->discont = TRUE;
   src->data.ABI.running = TRUE;
   /* and restart the task in case it got paused explicitely or by
    * the FLUSH_START event we pushed out. */
-  tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
+  gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
       src->srcpad);
-  if (res && !tres)
-    res = FALSE;
 
   /* and release the lock again so we can continue streaming */
   GST_PAD_STREAM_UNLOCK (src->srcpad);
@@ -1449,8 +1304,6 @@
 
   src = GST_BASE_SRC (element);
 
-  GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
-
   switch (GST_EVENT_TYPE (event)) {
       /* bidirectional events */
     case GST_EVENT_FLUSH_START:
@@ -1461,56 +1314,18 @@
 
       /* downstream serialized events */
     case GST_EVENT_EOS:
-    {
-      GstBaseSrcClass *bclass;
-
-      bclass = GST_BASE_SRC_GET_CLASS (src);
-
-      /* queue EOS and make sure the task or pull function performs the EOS
-       * actions. 
-       *
-       * We have two possibilities:
-       *
-       *  - Before we are to enter the _create function, we check the pending_eos
-       *    first and do EOS instead of entering it.
-       *  - If we are in the _create function or we did not manage to set the
-       *    flag fast enough and we are about to enter the _create function,
-       *    we unlock it so that we exit with WRONG_STATE immediatly. We then
-       *    check the EOS flag and do the EOS logic.
-       */
-      g_atomic_int_set (&src->priv->pending_eos, TRUE);
-      GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
-
-      /* unlock the _create function so that we can check the pending_eos flag
-       * and we can do EOS. This will eventually release the LIVE_LOCK again so
-       * that we can grab it and stop the unlock again. We don't take the stream
-       * lock so that this operation is guaranteed to never block. */
-      if (bclass->unlock)
-        bclass->unlock (src);
-
-      GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
-
+      /* queue EOS and make sure the task or pull function 
+       * performs the EOS actions. */
       GST_LIVE_LOCK (src);
-      GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
-      /* now stop the unlock of the streaming thread again. Grabbing the live
-       * lock is enough because that protects the create function. */
-      if (bclass->unlock_stop)
-        bclass->unlock_stop (src);
+      src->priv->pending_eos = TRUE;
       GST_LIVE_UNLOCK (src);
-
       result = TRUE;
       break;
-    }
     case GST_EVENT_NEWSEGMENT:
       /* sending random NEWSEGMENT downstream can break sync. */
       break;
     case GST_EVENT_TAG:
-      /* Insert tag in the dataflow */
-      GST_OBJECT_LOCK (src);
-      src->priv->pending_tags = g_list_append (src->priv->pending_tags, event);
-      GST_OBJECT_UNLOCK (src);
-      event = NULL;
-      result = TRUE;
+      /* sending tags could be useful, FIXME insert in dataflow */
       break;
     case GST_EVENT_BUFFERSIZE:
       /* does not seem to make much sense currently */
@@ -1531,7 +1346,6 @@
       GST_OBJECT_UNLOCK (src->srcpad);
 
       if (started) {
-        GST_DEBUG_OBJECT (src, "performing seek");
         /* when we are running in push mode, we can execute the
          * seek right now, we need to unlock. */
         result = gst_base_src_perform_seek (src, event, TRUE);
@@ -1541,7 +1355,6 @@
         /* else we store the event and execute the seek when we
          * get activated */
         GST_OBJECT_LOCK (src);
-        GST_DEBUG_OBJECT (src, "queueing seek");
         event_p = &src->data.ABI.pending_seek;
         gst_event_replace ((GstEvent **) event_p, event);
         GST_OBJECT_UNLOCK (src);
@@ -1595,17 +1408,6 @@
 }
 
 static gboolean
-gst_base_src_seekable (GstBaseSrc * src)
-{
-  GstBaseSrcClass *bclass;
-  bclass = GST_BASE_SRC_GET_CLASS (src);
-  if (bclass->is_seekable)
-    return bclass->is_seekable (src);
-  else
-    return FALSE;
-}
-
-static gboolean
 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
 {
   gboolean result;
@@ -1613,7 +1415,7 @@
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_SEEK:
       /* is normally called when in push mode */
-      if (!gst_base_src_seekable (src))
+      if (!src->seekable)
         goto not_seekable;
 
       result = gst_base_src_perform_seek (src, event, TRUE);
@@ -1679,7 +1481,7 @@
 
   switch (prop_id) {
     case PROP_BLOCKSIZE:
-      gst_base_src_set_blocksize (src, g_value_get_ulong (value));
+      src->blocksize = g_value_get_ulong (value);
       break;
     case PROP_NUM_BUFFERS:
       src->num_buffers = g_value_get_int (value);
@@ -1688,7 +1490,7 @@
       src->data.ABI.typefind = g_value_get_boolean (value);
       break;
     case PROP_DO_TIMESTAMP:
-      gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
+      src->priv->do_timestamp = g_value_get_boolean (value);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1706,7 +1508,7 @@
 
   switch (prop_id) {
     case PROP_BLOCKSIZE:
-      g_value_set_ulong (value, gst_base_src_get_blocksize (src));
+      g_value_set_ulong (value, src->blocksize);
       break;
     case PROP_NUM_BUFFERS:
       g_value_set_int (value, src->num_buffers);
@@ -1715,7 +1517,7 @@
       g_value_set_boolean (value, src->data.ABI.typefind);
       break;
     case PROP_DO_TIMESTAMP:
-      g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
+      g_value_set_boolean (value, src->priv->do_timestamp);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -2008,28 +1810,11 @@
       src->num_buffers_left--;
   }
 
-  /* don't enter the create function if a pending EOS event was set. For the
-   * logic of the pending_eos, check the event function of this class. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
-    goto eos;
-
   GST_DEBUG_OBJECT (src,
       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
       G_GINT64_FORMAT, offset, length, src->segment.time);
 
   ret = bclass->create (src, offset, length, buf);
-
-  /* The create function could be unlocked because we have a pending EOS. It's
-   * possible that we have a valid buffer from create that we need to
-   * discard when the create function returned _OK. */
-  if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
-    if (ret == GST_FLOW_OK) {
-      gst_buffer_unref (*buf);
-      *buf = NULL;
-    }
-    goto eos;
-  }
-
   if (G_UNLIKELY (ret != GST_FLOW_OK))
     goto not_ok;
 
@@ -2038,10 +1823,6 @@
       && GST_BUFFER_TIMESTAMP (*buf) == -1)
     GST_BUFFER_TIMESTAMP (*buf) = 0;
 
-  /* set pad caps on the buffer if the buffer had no caps */
-  if (GST_BUFFER_CAPS (*buf) == NULL)
-    gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
-
   /* now sync before pushing the buffer */
   status = gst_base_src_do_sync (src, *buf);
 
@@ -2049,6 +1830,9 @@
   if (G_UNLIKELY (src->priv->flushing))
     goto flushing;
 
+  if (G_UNLIKELY (src->priv->pending_eos))
+    goto eos;
+
   switch (status) {
     case GST_CLOCK_EARLY:
       /* the buffer is too late. We currently don't drop the buffer. */
@@ -2125,6 +1909,8 @@
 eos:
   {
     GST_DEBUG_OBJECT (src, "we are EOS");
+    gst_buffer_unref (*buf);
+    *buf = NULL;
     return GST_FLOW_UNEXPECTED;
   }
 }
@@ -2142,6 +1928,10 @@
   if (G_UNLIKELY (src->priv->flushing))
     goto flushing;
 
+  /* if we're EOS, return right away */
+  if (G_UNLIKELY (src->priv->pending_eos))
+    goto eos;
+
   res = gst_base_src_get_range (src, offset, length, buf);
 
 done:
@@ -2158,6 +1948,12 @@
     res = GST_FLOW_WRONG_STATE;
     goto done;
   }
+eos:
+  {
+    GST_DEBUG_OBJECT (src, "we are EOS");
+    res = GST_FLOW_UNEXPECTED;
+    goto done;
+  }
 }
 
 static gboolean
@@ -2226,17 +2022,19 @@
   gint64 position;
   gboolean eos;
   gulong blocksize;
-  GList *tags, *tmp;
 
   eos = FALSE;
 
   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
 
   GST_LIVE_LOCK (src);
-
   if (G_UNLIKELY (src->priv->flushing))
     goto flushing;
 
+  /* if we're EOS, return right away */
+  if (G_UNLIKELY (src->priv->pending_eos))
+    goto eos;
+
   src->priv->last_sent_eos = FALSE;
 
   blocksize = src->blocksize;
@@ -2258,9 +2056,6 @@
   } else
     position = -1;
 
-  GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
-      GST_TIME_ARGS (position), blocksize);
-
   ret = gst_base_src_get_range (src, position, blocksize, &buf);
   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
@@ -2282,21 +2077,6 @@
     src->priv->start_segment = NULL;
   }
 
-  GST_OBJECT_LOCK (src);
-  /* take the tags */
-  tags = src->priv->pending_tags;
-  src->priv->pending_tags = NULL;
-  GST_OBJECT_UNLOCK (src);
-
-  /* Push out pending tags if any */
-  if (G_UNLIKELY (tags != NULL)) {
-    for (tmp = tags; tmp; tmp = g_list_next (tmp)) {
-      GstEvent *ev = (GstEvent *) tmp->data;
-      gst_pad_push_event (pad, ev);
-    }
-    g_list_free (tags);
-  }
-
   /* figure out the new position */
   switch (src->segment.format) {
     case GST_FORMAT_BYTES:
@@ -2393,10 +2173,16 @@
     ret = GST_FLOW_WRONG_STATE;
     goto pause;
   }
+eos:
+  {
+    GST_DEBUG_OBJECT (src, "we are EOS");
+    GST_LIVE_UNLOCK (src);
+    ret = GST_FLOW_UNEXPECTED;
+    goto pause;
+  }
 pause:
   {
     const gchar *reason = gst_flow_get_name (ret);
-    GstEvent *event;
 
     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
     src->data.ABI.running = FALSE;
@@ -2405,27 +2191,20 @@
       if (ret == GST_FLOW_UNEXPECTED) {
         /* perform EOS logic */
         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
-          GstMessage *message;
-
-          message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
-              src->segment.format, src->segment.last_stop);
-          gst_message_set_seqnum (message, src->priv->seqnum);
-          gst_element_post_message (GST_ELEMENT_CAST (src), message);
+          gst_element_post_message (GST_ELEMENT_CAST (src),
+              gst_message_new_segment_done (GST_OBJECT_CAST (src),
+                  src->segment.format, src->segment.last_stop));
         } else {
-          event = gst_event_new_eos ();
-          gst_event_set_seqnum (event, src->priv->seqnum);
-          gst_pad_push_event (pad, event);
+          gst_pad_push_event (pad, gst_event_new_eos ());
           src->priv->last_sent_eos = TRUE;
         }
       } else {
-        event = gst_event_new_eos ();
-        gst_event_set_seqnum (event, src->priv->seqnum);
         /* for fatal errors we post an error message, post the error
          * first so the app knows about the error first. */
         GST_ELEMENT_ERROR (src, STREAM, FAILED,
             (_("Internal data flow error.")),
             ("streaming task paused, reason %s (%d)", reason, ret));
-        gst_pad_push_event (pad, event);
+        gst_pad_push_event (pad, gst_event_new_eos ());
         src->priv->last_sent_eos = TRUE;
       }
     }
@@ -2462,9 +2241,6 @@
   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
     goto no_nego_needed;
 
-  if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
-    goto no_caps;
-
   /* get the peer caps */
   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
@@ -2499,14 +2275,12 @@
          * nego is not needed */
         result = TRUE;
       } else if (gst_caps_is_fixed (caps)) {
-        /* yay, fixed caps, use those then, it's possible that the subclass does
-         * not accept this caps after all and we have to fail. */
-        result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
+        /* yay, fixed caps, use those then */
+        gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
+        result = TRUE;
       }
     }
     gst_caps_unref (caps);
-  } else {
-    GST_DEBUG_OBJECT (basesrc, "no common caps");
   }
   return result;
 
@@ -2517,15 +2291,6 @@
       gst_caps_unref (thiscaps);
     return TRUE;
   }
-no_caps:
-  {
-    GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
-        ("No supported formats found"),
-        ("This element did not produce valid caps"));
-    if (thiscaps)
-      gst_caps_unref (thiscaps);
-    return TRUE;
-  }
 }
 
 static gboolean
@@ -2548,7 +2313,6 @@
   GstBaseSrcClass *bclass;
   gboolean result;
   guint64 size;
-  gboolean seekable;
 
   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
     return TRUE;
@@ -2593,11 +2357,16 @@
       G_GINT64_FORMAT, basesrc->segment.format, result, size,
       basesrc->segment.duration);
 
-  seekable = gst_base_src_seekable (basesrc);
-  GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
+  /* check if we can seek */
+  if (bclass->is_seekable)
+    basesrc->seekable = bclass->is_seekable (basesrc);
+  else
+    basesrc->seekable = FALSE;
+
+  GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
 
   /* update for random access flag */
-  basesrc->random_access = seekable &&
+  basesrc->random_access = basesrc->seekable &&
       basesrc->segment.format == GST_FORMAT_BYTES;
 
   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
@@ -2606,9 +2375,7 @@
   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
     GstCaps *caps;
 
-    if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
-      goto typefind_failed;
-
+    caps = gst_type_find_helper (basesrc->srcpad, size);
     gst_pad_set_caps (basesrc->srcpad, caps);
     gst_caps_unref (caps);
   } else {
@@ -2635,14 +2402,6 @@
     gst_base_src_stop (basesrc);
     return FALSE;
   }
-typefind_failed:
-  {
-    GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
-    GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
-    /* we must call stop */
-    gst_base_src_stop (basesrc);
-    return FALSE;
-  }
 }
 
 static gboolean
@@ -2693,9 +2452,8 @@
   if (flushing) {
     /* if we are locked in the live lock, signal it to make it flush */
     basesrc->live_running = TRUE;
-
     /* clear pending EOS if any */
-    g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
+    basesrc->priv->pending_eos = FALSE;
 
     /* step 1, now that we have the LIVE lock, clear our unlock request */
     if (bclass->unlock_stop)
@@ -2938,7 +2696,7 @@
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
     {
-      GstEvent **event_p, *event;
+      GstEvent **event_p;
 
       /* we don't need to unblock anything here, the pad deactivation code
        * already did this */
@@ -2948,12 +2706,10 @@
        * the EOS event to the element */
       if (!basesrc->priv->last_sent_eos) {
         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
-        event = gst_event_new_eos ();
-        gst_event_set_seqnum (event, basesrc->priv->seqnum);
-        gst_pad_push_event (basesrc->srcpad, event);
+        gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
         basesrc->priv->last_sent_eos = TRUE;
       }
-      g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
+      basesrc->priv->pending_eos = FALSE;
       event_p = &basesrc->data.ABI.pending_seek;
       gst_event_replace (event_p, NULL);
       event_p = &basesrc->priv->close_segment;