--- a/gstreamer_core/libs/gst/base/gstbasesrc.c Wed Mar 31 22:03:18 2010 +0300
+++ b/gstreamer_core/libs/gst/base/gstbasesrc.c Tue Aug 31 15:30:33 2010 +0300
@@ -33,14 +33,11 @@
* <listitem><para>live sources</para></listitem>
* </itemizedlist>
*
- * <refsect2>
- * <para>
* The source can be configured to operate in any #GstFormat with the
* gst_base_src_set_format() method. The currently set format determines
* the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
* events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
- * </para>
- * <para>
+ *
* #GstBaseSrc always supports push mode scheduling. If the following
* conditions are met, it also supports pull mode scheduling:
* <itemizedlist>
@@ -49,12 +46,10 @@
* <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
* </listitem>
* </itemizedlist>
- * </para>
- * <para>
+ *
* Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any
* time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE.
- * </para>
- * <para>
+ *
* If all the conditions are met for operating in pull mode, #GstBaseSrc is
* automatically seekable in push mode as well. The following conditions must
* be met to make the element seekable in push mode when the format is not
@@ -71,40 +66,34 @@
* #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
* </para></listitem>
* </itemizedlist>
- * </para>
- * <para>
+ *
* When the element does not meet the requirements to operate in pull mode,
* the offset and length in the #GstBaseSrc::create method should be ignored.
* It is recommended to subclass #GstPushSrc instead, in this situation. If the
* element can operate in pull mode but only with specific offsets and
* lengths, it is allowed to generate an error when the wrong values are passed
* to the #GstBaseSrc::create function.
- * </para>
- * <para>
+ *
* #GstBaseSrc has support for live sources. Live sources are sources that when
* paused discard data, such as audio or video capture devices. A typical live
* source also produces data at a fixed rate and thus provides a clock to publish
* this rate.
* Use gst_base_src_set_live() to activate the live source mode.
- * </para>
- * <para>
+ *
* A live source does not produce data in the PAUSED state. This means that the
* #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
* To signal the pipeline that the element will not produce data, the return
* value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
- * </para>
- * <para>
+ *
* A typical live source will timestamp the buffers it creates with the
* current running time of the pipeline. This is one reason why a live source
* can only produce data in the PLAYING state, when the clock is actually
* distributed and running.
- * </para>
- * <para>
+ *
* Live sources that synchronize and block on the clock (an audio source, for
* example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
* function was interrupted by a state change to PAUSED.
- * </para>
- * <para>
+ *
* The #GstBaseSrc::get_times method can be used to implement pseudo-live
* sources.
* It only makes sense to implement the ::get_times function if the source is
@@ -113,21 +102,17 @@
* the timestamps are transformed into the current running_time.
* The base source will then wait for the calculated running_time before pushing
* out the buffer.
- * </para>
- * <para>
+ *
* For live sources, the base class will by default report a latency of 0.
* For pseudo live sources, the base class will by default measure the difference
* between the first buffer timestamp and the start time of get_times and will
* report this value as the latency.
* Subclasses should override the query function when this behaviour is not
* acceptable.
- * </para>
- * <para>
+ *
* There is only support in #GstBaseSrc for exactly one source pad, which
* should be named "src". A source implementation (subclass of #GstBaseSrc)
* should install a pad template in its class_init function, like so:
- * </para>
- * <para>
* <programlisting>
* static void
* my_element_class_init (GstMyElementClass *klass)
@@ -141,7 +126,8 @@
* gst_element_class_set_details (gstelement_class, &details);
* }
* </programlisting>
- * </para>
+ *
+ * <refsect2>
* <title>Controlled shutdown of live sources in applications</title>
* <para>
* Applications that record from a live source may want to stop recording
@@ -152,22 +138,19 @@
* event down the pipeline. The application would then wait for an
* EOS message posted on the pipeline's bus to know when all data has
* been processed and the pipeline can safely be stopped.
- * </para>
- * <para>
+ *
* Since GStreamer 0.10.16 an application may send an EOS event to a source
- * element to make it send an EOS event downstream. This can typically be done
+ * element to make it perform the EOS logic (send EOS event downstream or post a
+ * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
* with the gst_element_send_event() function on the element or its parent bin.
- * </para>
- * <para>
+ *
* After the EOS has been sent to the element, the application should wait for
* an EOS message to be posted on the pipeline's bus. Once this EOS message is
* received, it may safely shut down the entire pipeline.
- * </para>
- * <para>
+ *
* The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
* is still available but deprecated as it is dangerous and less flexible.
- * </para>
- * <para>
+ *
* Last reviewed on 2007-12-19 (0.10.16)
* </para>
* </refsect2>
@@ -243,8 +226,8 @@
GstEvent *close_segment;
GstEvent *start_segment;
- /* if EOS is pending */
- gboolean pending_eos;
+ /* if EOS is pending (atomic) */
+ gint pending_eos;
/* startup latency is the time it takes between going to PLAYING and producing
* the first BUFFER with running_time 0. This value is included in the latency
@@ -255,6 +238,12 @@
GstClockTimeDiff ts_offset;
gboolean do_timestamp;
+
+ /* stream sequence number */
+ guint32 seqnum;
+
+ /* pending tags to be pushed in the data stream */
+ GList *pending_tags;
};
static GstElementClass *parent_class = NULL;
@@ -272,9 +261,10 @@
GType
gst_base_src_get_type (void)
{
- static GType base_src_type = 0;
-
- if (G_UNLIKELY (base_src_type == 0)) {
+ static volatile gsize base_src_type = 0;
+
+ if (g_once_init_enter (&base_src_type)) {
+ GType _type;
static const GTypeInfo base_src_info = {
sizeof (GstBaseSrcClass),
(GBaseInitFunc) gst_base_src_base_init,
@@ -287,11 +277,13 @@
(GInstanceInitFunc) gst_base_src_init,
};
- base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
+ _type = g_type_register_static (GST_TYPE_ELEMENT,
"GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
+ g_once_init_leave (&base_src_type, _type);
}
return base_src_type;
}
+
static GstCaps *gst_base_src_getcaps (GstPad * pad);
static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
@@ -331,6 +323,7 @@
guint length, GstBuffer ** buf);
static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
guint length, GstBuffer ** buf);
+static gboolean gst_base_src_seekable (GstBaseSrc * src);
static void
gst_base_src_base_init (gpointer g_class)
@@ -357,20 +350,21 @@
g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
g_param_spec_ulong ("blocksize", "Block size",
- "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
- DEFAULT_BLOCKSIZE, G_PARAM_READWRITE));
+ "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
+ DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
g_param_spec_int ("num-buffers", "num-buffers",
- "Number of buffers to output before sending EOS", -1, G_MAXINT,
- DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
+ "Number of buffers to output before sending EOS (-1 = unlimited)",
+ -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
+ G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_TYPEFIND,
g_param_spec_boolean ("typefind", "Typefind",
"Run typefind before negotiating", DEFAULT_TYPEFIND,
- G_PARAM_READWRITE));
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
g_param_spec_boolean ("do-timestamp", "Do timestamp",
"Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
- G_PARAM_READWRITE));
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_base_src_change_state);
@@ -460,6 +454,11 @@
event_p = &basesrc->data.ABI.pending_seek;
gst_event_replace (event_p, NULL);
+ if (basesrc->priv->pending_tags) {
+ g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL);
+ g_list_free (basesrc->priv->pending_tags);
+ }
+
G_OBJECT_CLASS (parent_class)->finalize (object);
}
@@ -488,6 +487,8 @@
GstFlowReturn
gst_base_src_wait_playing (GstBaseSrc * src)
{
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
+
/* block until the state changes, or we get a flush, or something */
GST_DEBUG_OBJECT (src, "live source waiting for running state");
GST_LIVE_WAIT (src);
@@ -526,6 +527,8 @@
void
gst_base_src_set_live (GstBaseSrc * src, gboolean live)
{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
GST_OBJECT_LOCK (src);
src->is_live = live;
GST_OBJECT_UNLOCK (src);
@@ -548,6 +551,8 @@
{
gboolean result;
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
GST_OBJECT_LOCK (src);
result = src->is_live;
GST_OBJECT_UNLOCK (src);
@@ -575,6 +580,8 @@
void
gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
gst_segment_init (&src->segment, format);
}
@@ -606,6 +613,8 @@
{
GstClockTime min;
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
GST_OBJECT_LOCK (src);
if (live)
*live = src->is_live;
@@ -632,6 +641,59 @@
}
/**
+ * gst_base_src_set_blocksize:
+ * @src: the source
+ * @blocksize: the new blocksize in bytes
+ *
+ * Set the number of bytes that @src will push out with each buffer. When
+ * @blocksize is set to -1, a default length will be used.
+ *
+ * Since: 0.10.22
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+void
+gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
+{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
+ GST_OBJECT_LOCK (src);
+ src->blocksize = blocksize;
+ GST_OBJECT_UNLOCK (src);
+}
+
+/**
+ * gst_base_src_get_blocksize:
+ * @src: the source
+ *
+ * Get the number of bytes that @src will push out with each buffer.
+ *
+ * Returns: the number of bytes pushed with each buffer.
+ *
+ * Since: 0.10.22
+ */
+#ifdef __SYMBIAN32__
+EXPORT_C
+#endif
+
+gulong
+gst_base_src_get_blocksize (GstBaseSrc * src)
+{
+ gulong res;
+
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
+
+ GST_OBJECT_LOCK (src);
+ res = src->blocksize;
+ GST_OBJECT_UNLOCK (src);
+
+ return res;
+}
+
+
+/**
* gst_base_src_set_do_timestamp:
* @src: the source
* @timestamp: enable or disable timestamping
@@ -649,6 +711,8 @@
void
gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
GST_OBJECT_LOCK (src);
src->priv->do_timestamp = timestamp;
GST_OBJECT_UNLOCK (src);
@@ -673,6 +737,8 @@
{
gboolean res;
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
GST_OBJECT_LOCK (src);
res = src->priv->do_timestamp;
GST_OBJECT_UNLOCK (src);
@@ -797,6 +863,7 @@
GST_DEBUG_OBJECT (src, "duration query in format %s",
gst_format_get_name (format));
+
switch (format) {
case GST_FORMAT_PERCENT:
gst_query_set_duration (query, GST_FORMAT_PERCENT,
@@ -832,9 +899,20 @@
case GST_QUERY_SEEKING:
{
- gst_query_set_seeking (query, src->segment.format,
- src->seekable, 0, src->segment.duration);
- res = TRUE;
+ GstFormat format;
+
+ gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
+ if (format == src->segment.format) {
+ gst_query_set_seeking (query, src->segment.format,
+ gst_base_src_seekable (src), 0, src->segment.duration);
+ res = TRUE;
+ } else {
+ /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
+ /* Don't reply to the query to make up for demuxers which don't
+ * handle the SEEKING query yet. Players like Totem will fall back
+ * to the duration when the SEEKING query isn't answered. */
+ res = FALSE;
+ }
break;
}
case GST_QUERY_SEGMENT:
@@ -899,6 +977,46 @@
}
case GST_QUERY_JITTER:
case GST_QUERY_RATE:
+ res = FALSE;
+ break;
+ case GST_QUERY_BUFFERING:
+ {
+ GstFormat format;
+ gint64 start, stop, estimated;
+
+ gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
+
+ GST_DEBUG_OBJECT (src, "buffering query in format %s",
+ gst_format_get_name (format));
+
+ if (src->random_access) {
+ estimated = 0;
+ start = 0;
+ if (format == GST_FORMAT_PERCENT)
+ stop = GST_FORMAT_PERCENT_MAX;
+ else
+ stop = src->segment.duration;
+ } else {
+ estimated = -1;
+ start = -1;
+ stop = -1;
+ }
+ /* convert to required format. When the conversion fails, we can't answer
+ * the query. When the value is unknown, we can don't perform conversion
+ * but report TRUE. */
+ if (format != GST_FORMAT_PERCENT && stop != -1) {
+ res = gst_pad_query_convert (src->srcpad, src->segment.format,
+ stop, &format, &stop);
+ } else {
+ res = TRUE;
+ }
+ if (res && format != GST_FORMAT_PERCENT && start != -1)
+ res = gst_pad_query_convert (src->srcpad, src->segment.format,
+ start, &format, &start);
+
+ gst_query_set_buffering_range (query, format, start, stop, estimated);
+ break;
+ }
default:
res = FALSE;
break;
@@ -940,9 +1058,10 @@
} else if (segment->start == 0) {
/* seek to start, we can implement a default for this. */
segment->time = 0;
- res = TRUE;
- } else
+ } else {
res = FALSE;
+ GST_INFO_OBJECT (src, "Can't do a default seek");
+ }
return res;
}
@@ -1096,7 +1215,7 @@
static gboolean
gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
{
- gboolean res = TRUE;
+ gboolean res = TRUE, tres;
gdouble rate;
GstFormat seek_format, dest_format;
GstSeekFlags flags;
@@ -1107,6 +1226,8 @@
gboolean relative_seek = FALSE;
gboolean seekseg_configured = FALSE;
GstSegment seeksegment;
+ guint32 seqnum;
+ GstEvent *tevent;
GST_DEBUG_OBJECT (src, "doing seek");
@@ -1122,7 +1243,8 @@
if (dest_format != seek_format && !relative_seek) {
/* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
* here before taking the stream lock, otherwise we must convert it later,
- * once we have the stream lock and can read the current position */
+ * once we have the stream lock and can read the last configures segment
+ * start and stop positions */
gst_segment_init (&seeksegment, dest_format);
if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
@@ -1132,14 +1254,19 @@
}
flush = flags & GST_SEEK_FLAG_FLUSH;
+ seqnum = gst_event_get_seqnum (event);
} else {
flush = FALSE;
+ /* get next seqnum */
+ seqnum = gst_util_seqnum_next ();
}
/* send flush start */
- if (flush)
- gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
- else
+ if (flush) {
+ tevent = gst_event_new_flush_start ();
+ gst_event_set_seqnum (tevent, seqnum);
+ gst_pad_push_event (src->srcpad, tevent);
+ } else
gst_pad_pause_task (src->srcpad);
/* unblock streaming thread. */
@@ -1149,6 +1276,14 @@
* because the task is paused, our streaming thread stopped
* or because our peer is flushing. */
GST_PAD_STREAM_LOCK (src->srcpad);
+ if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
+ /* we have seen this event before, issue a warning for now */
+ GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
+ seqnum);
+ } else {
+ src->priv->seqnum = seqnum;
+ GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
+ }
gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
@@ -1191,9 +1326,11 @@
/* and prepare to continue streaming */
if (flush) {
+ tevent = gst_event_new_flush_stop ();
+ gst_event_set_seqnum (tevent, seqnum);
/* send flush stop, peer will accept data and events again. We
* are not yet providing data as we still have the STREAM_LOCK. */
- gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
+ gst_pad_push_event (src->srcpad, tevent);
} else if (res && src->data.ABI.running) {
/* we are running the current segment and doing a non-flushing seek,
* close the segment first based on the last_stop. */
@@ -1207,6 +1344,7 @@
gst_event_new_new_segment_full (TRUE,
src->segment.rate, src->segment.applied_rate, src->segment.format,
src->segment.start, src->segment.last_stop, src->segment.time);
+ gst_event_set_seqnum (src->priv->close_segment, seqnum);
}
/* The subclass must have converted the segment to the processing format
@@ -1223,9 +1361,13 @@
memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
- gst_element_post_message (GST_ELEMENT (src),
- gst_message_new_segment_start (GST_OBJECT (src),
- src->segment.format, src->segment.last_stop));
+ GstMessage *message;
+
+ message = gst_message_new_segment_start (GST_OBJECT (src),
+ src->segment.format, src->segment.last_stop);
+ gst_message_set_seqnum (message, seqnum);
+
+ gst_element_post_message (GST_ELEMENT (src), message);
}
/* for deriving a stop position for the playback segment from the seek
@@ -1247,20 +1389,23 @@
src->segment.rate, src->segment.applied_rate, src->segment.format,
src->segment.last_stop, stop, src->segment.time);
} else {
- /* reverse, we send data from stop to last_stop */
+ /* reverse, we send data from last_stop to start */
src->priv->start_segment =
gst_event_new_new_segment_full (FALSE,
src->segment.rate, src->segment.applied_rate, src->segment.format,
src->segment.start, src->segment.last_stop, src->segment.time);
}
+ gst_event_set_seqnum (src->priv->start_segment, seqnum);
}
src->priv->discont = TRUE;
src->data.ABI.running = TRUE;
/* and restart the task in case it got paused explicitely or by
* the FLUSH_START event we pushed out. */
- gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
+ tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
src->srcpad);
+ if (res && !tres)
+ res = FALSE;
/* and release the lock again so we can continue streaming */
GST_PAD_STREAM_UNLOCK (src->srcpad);
@@ -1304,6 +1449,8 @@
src = GST_BASE_SRC (element);
+ GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
+
switch (GST_EVENT_TYPE (event)) {
/* bidirectional events */
case GST_EVENT_FLUSH_START:
@@ -1314,18 +1461,56 @@
/* downstream serialized events */
case GST_EVENT_EOS:
- /* queue EOS and make sure the task or pull function
- * performs the EOS actions. */
+ {
+ GstBaseSrcClass *bclass;
+
+ bclass = GST_BASE_SRC_GET_CLASS (src);
+
+ /* queue EOS and make sure the task or pull function performs the EOS
+ * actions.
+ *
+ * We have two possibilities:
+ *
+ * - Before we are to enter the _create function, we check the pending_eos
+ * first and do EOS instead of entering it.
+ * - If we are in the _create function or we did not manage to set the
+ * flag fast enough and we are about to enter the _create function,
+ * we unlock it so that we exit with WRONG_STATE immediatly. We then
+ * check the EOS flag and do the EOS logic.
+ */
+ g_atomic_int_set (&src->priv->pending_eos, TRUE);
+ GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
+
+ /* unlock the _create function so that we can check the pending_eos flag
+ * and we can do EOS. This will eventually release the LIVE_LOCK again so
+ * that we can grab it and stop the unlock again. We don't take the stream
+ * lock so that this operation is guaranteed to never block. */
+ if (bclass->unlock)
+ bclass->unlock (src);
+
+ GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
+
GST_LIVE_LOCK (src);
- src->priv->pending_eos = TRUE;
+ GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
+ /* now stop the unlock of the streaming thread again. Grabbing the live
+ * lock is enough because that protects the create function. */
+ if (bclass->unlock_stop)
+ bclass->unlock_stop (src);
GST_LIVE_UNLOCK (src);
+
result = TRUE;
break;
+ }
case GST_EVENT_NEWSEGMENT:
/* sending random NEWSEGMENT downstream can break sync. */
break;
case GST_EVENT_TAG:
- /* sending tags could be useful, FIXME insert in dataflow */
+ /* Insert tag in the dataflow */
+ GST_OBJECT_LOCK (src);
+ src->priv->pending_tags = g_list_append (src->priv->pending_tags, event);
+ GST_OBJECT_UNLOCK (src);
+ event = NULL;
+ result = TRUE;
break;
case GST_EVENT_BUFFERSIZE:
/* does not seem to make much sense currently */
@@ -1346,6 +1531,7 @@
GST_OBJECT_UNLOCK (src->srcpad);
if (started) {
+ GST_DEBUG_OBJECT (src, "performing seek");
/* when we are running in push mode, we can execute the
* seek right now, we need to unlock. */
result = gst_base_src_perform_seek (src, event, TRUE);
@@ -1355,6 +1541,7 @@
/* else we store the event and execute the seek when we
* get activated */
GST_OBJECT_LOCK (src);
+ GST_DEBUG_OBJECT (src, "queueing seek");
event_p = &src->data.ABI.pending_seek;
gst_event_replace ((GstEvent **) event_p, event);
GST_OBJECT_UNLOCK (src);
@@ -1408,6 +1595,17 @@
}
static gboolean
+gst_base_src_seekable (GstBaseSrc * src)
+{
+ GstBaseSrcClass *bclass;
+ bclass = GST_BASE_SRC_GET_CLASS (src);
+ if (bclass->is_seekable)
+ return bclass->is_seekable (src);
+ else
+ return FALSE;
+}
+
+static gboolean
gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
{
gboolean result;
@@ -1415,7 +1613,7 @@
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEEK:
/* is normally called when in push mode */
- if (!src->seekable)
+ if (!gst_base_src_seekable (src))
goto not_seekable;
result = gst_base_src_perform_seek (src, event, TRUE);
@@ -1481,7 +1679,7 @@
switch (prop_id) {
case PROP_BLOCKSIZE:
- src->blocksize = g_value_get_ulong (value);
+ gst_base_src_set_blocksize (src, g_value_get_ulong (value));
break;
case PROP_NUM_BUFFERS:
src->num_buffers = g_value_get_int (value);
@@ -1490,7 +1688,7 @@
src->data.ABI.typefind = g_value_get_boolean (value);
break;
case PROP_DO_TIMESTAMP:
- src->priv->do_timestamp = g_value_get_boolean (value);
+ gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1508,7 +1706,7 @@
switch (prop_id) {
case PROP_BLOCKSIZE:
- g_value_set_ulong (value, src->blocksize);
+ g_value_set_ulong (value, gst_base_src_get_blocksize (src));
break;
case PROP_NUM_BUFFERS:
g_value_set_int (value, src->num_buffers);
@@ -1517,7 +1715,7 @@
g_value_set_boolean (value, src->data.ABI.typefind);
break;
case PROP_DO_TIMESTAMP:
- g_value_set_boolean (value, src->priv->do_timestamp);
+ g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -1810,11 +2008,28 @@
src->num_buffers_left--;
}
+ /* don't enter the create function if a pending EOS event was set. For the
+ * logic of the pending_eos, check the event function of this class. */
+ if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
+ goto eos;
+
GST_DEBUG_OBJECT (src,
"calling create offset %" G_GUINT64_FORMAT " length %u, time %"
G_GINT64_FORMAT, offset, length, src->segment.time);
ret = bclass->create (src, offset, length, buf);
+
+ /* The create function could be unlocked because we have a pending EOS. It's
+ * possible that we have a valid buffer from create that we need to
+ * discard when the create function returned _OK. */
+ if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
+ if (ret == GST_FLOW_OK) {
+ gst_buffer_unref (*buf);
+ *buf = NULL;
+ }
+ goto eos;
+ }
+
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto not_ok;
@@ -1823,6 +2038,10 @@
&& GST_BUFFER_TIMESTAMP (*buf) == -1)
GST_BUFFER_TIMESTAMP (*buf) = 0;
+ /* set pad caps on the buffer if the buffer had no caps */
+ if (GST_BUFFER_CAPS (*buf) == NULL)
+ gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
+
/* now sync before pushing the buffer */
status = gst_base_src_do_sync (src, *buf);
@@ -1830,9 +2049,6 @@
if (G_UNLIKELY (src->priv->flushing))
goto flushing;
- if (G_UNLIKELY (src->priv->pending_eos))
- goto eos;
-
switch (status) {
case GST_CLOCK_EARLY:
/* the buffer is too late. We currently don't drop the buffer. */
@@ -1909,8 +2125,6 @@
eos:
{
GST_DEBUG_OBJECT (src, "we are EOS");
- gst_buffer_unref (*buf);
- *buf = NULL;
return GST_FLOW_UNEXPECTED;
}
}
@@ -1928,10 +2142,6 @@
if (G_UNLIKELY (src->priv->flushing))
goto flushing;
- /* if we're EOS, return right away */
- if (G_UNLIKELY (src->priv->pending_eos))
- goto eos;
-
res = gst_base_src_get_range (src, offset, length, buf);
done:
@@ -1948,12 +2158,6 @@
res = GST_FLOW_WRONG_STATE;
goto done;
}
-eos:
- {
- GST_DEBUG_OBJECT (src, "we are EOS");
- res = GST_FLOW_UNEXPECTED;
- goto done;
- }
}
static gboolean
@@ -2022,19 +2226,17 @@
gint64 position;
gboolean eos;
gulong blocksize;
+ GList *tags, *tmp;
eos = FALSE;
src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
GST_LIVE_LOCK (src);
+
if (G_UNLIKELY (src->priv->flushing))
goto flushing;
- /* if we're EOS, return right away */
- if (G_UNLIKELY (src->priv->pending_eos))
- goto eos;
-
src->priv->last_sent_eos = FALSE;
blocksize = src->blocksize;
@@ -2056,6 +2258,9 @@
} else
position = -1;
+ GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
+ GST_TIME_ARGS (position), blocksize);
+
ret = gst_base_src_get_range (src, position, blocksize, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
@@ -2077,6 +2282,21 @@
src->priv->start_segment = NULL;
}
+ GST_OBJECT_LOCK (src);
+ /* take the tags */
+ tags = src->priv->pending_tags;
+ src->priv->pending_tags = NULL;
+ GST_OBJECT_UNLOCK (src);
+
+ /* Push out pending tags if any */
+ if (G_UNLIKELY (tags != NULL)) {
+ for (tmp = tags; tmp; tmp = g_list_next (tmp)) {
+ GstEvent *ev = (GstEvent *) tmp->data;
+ gst_pad_push_event (pad, ev);
+ }
+ g_list_free (tags);
+ }
+
/* figure out the new position */
switch (src->segment.format) {
case GST_FORMAT_BYTES:
@@ -2173,16 +2393,10 @@
ret = GST_FLOW_WRONG_STATE;
goto pause;
}
-eos:
- {
- GST_DEBUG_OBJECT (src, "we are EOS");
- GST_LIVE_UNLOCK (src);
- ret = GST_FLOW_UNEXPECTED;
- goto pause;
- }
pause:
{
const gchar *reason = gst_flow_get_name (ret);
+ GstEvent *event;
GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
src->data.ABI.running = FALSE;
@@ -2191,20 +2405,27 @@
if (ret == GST_FLOW_UNEXPECTED) {
/* perform EOS logic */
if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
- gst_element_post_message (GST_ELEMENT_CAST (src),
- gst_message_new_segment_done (GST_OBJECT_CAST (src),
- src->segment.format, src->segment.last_stop));
+ GstMessage *message;
+
+ message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
+ src->segment.format, src->segment.last_stop);
+ gst_message_set_seqnum (message, src->priv->seqnum);
+ gst_element_post_message (GST_ELEMENT_CAST (src), message);
} else {
- gst_pad_push_event (pad, gst_event_new_eos ());
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, src->priv->seqnum);
+ gst_pad_push_event (pad, event);
src->priv->last_sent_eos = TRUE;
}
} else {
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, src->priv->seqnum);
/* for fatal errors we post an error message, post the error
* first so the app knows about the error first. */
GST_ELEMENT_ERROR (src, STREAM, FAILED,
(_("Internal data flow error.")),
("streaming task paused, reason %s (%d)", reason, ret));
- gst_pad_push_event (pad, gst_event_new_eos ());
+ gst_pad_push_event (pad, event);
src->priv->last_sent_eos = TRUE;
}
}
@@ -2241,6 +2462,9 @@
if (thiscaps == NULL || gst_caps_is_any (thiscaps))
goto no_nego_needed;
+ if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
+ goto no_caps;
+
/* get the peer caps */
peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
@@ -2275,12 +2499,14 @@
* nego is not needed */
result = TRUE;
} else if (gst_caps_is_fixed (caps)) {
- /* yay, fixed caps, use those then */
- gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
- result = TRUE;
+ /* yay, fixed caps, use those then, it's possible that the subclass does
+ * not accept this caps after all and we have to fail. */
+ result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
}
}
gst_caps_unref (caps);
+ } else {
+ GST_DEBUG_OBJECT (basesrc, "no common caps");
}
return result;
@@ -2291,6 +2517,15 @@
gst_caps_unref (thiscaps);
return TRUE;
}
+no_caps:
+ {
+ GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
+ ("No supported formats found"),
+ ("This element did not produce valid caps"));
+ if (thiscaps)
+ gst_caps_unref (thiscaps);
+ return TRUE;
+ }
}
static gboolean
@@ -2313,6 +2548,7 @@
GstBaseSrcClass *bclass;
gboolean result;
guint64 size;
+ gboolean seekable;
if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
return TRUE;
@@ -2357,16 +2593,11 @@
G_GINT64_FORMAT, basesrc->segment.format, result, size,
basesrc->segment.duration);
- /* check if we can seek */
- if (bclass->is_seekable)
- basesrc->seekable = bclass->is_seekable (basesrc);
- else
- basesrc->seekable = FALSE;
-
- GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
+ seekable = gst_base_src_seekable (basesrc);
+ GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
/* update for random access flag */
- basesrc->random_access = basesrc->seekable &&
+ basesrc->random_access = seekable &&
basesrc->segment.format == GST_FORMAT_BYTES;
GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
@@ -2375,7 +2606,9 @@
if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
GstCaps *caps;
- caps = gst_type_find_helper (basesrc->srcpad, size);
+ if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
+ goto typefind_failed;
+
gst_pad_set_caps (basesrc->srcpad, caps);
gst_caps_unref (caps);
} else {
@@ -2402,6 +2635,14 @@
gst_base_src_stop (basesrc);
return FALSE;
}
+typefind_failed:
+ {
+ GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
+ GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
+ /* we must call stop */
+ gst_base_src_stop (basesrc);
+ return FALSE;
+ }
}
static gboolean
@@ -2452,8 +2693,9 @@
if (flushing) {
/* if we are locked in the live lock, signal it to make it flush */
basesrc->live_running = TRUE;
+
/* clear pending EOS if any */
- basesrc->priv->pending_eos = FALSE;
+ g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
/* step 1, now that we have the LIVE lock, clear our unlock request */
if (bclass->unlock_stop)
@@ -2696,7 +2938,7 @@
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
{
- GstEvent **event_p;
+ GstEvent **event_p, *event;
/* we don't need to unblock anything here, the pad deactivation code
* already did this */
@@ -2706,10 +2948,12 @@
* the EOS event to the element */
if (!basesrc->priv->last_sent_eos) {
GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
- gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
+ event = gst_event_new_eos ();
+ gst_event_set_seqnum (event, basesrc->priv->seqnum);
+ gst_pad_push_event (basesrc->srcpad, event);
basesrc->priv->last_sent_eos = TRUE;
}
- basesrc->priv->pending_eos = FALSE;
+ g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
event_p = &basesrc->data.ABI.pending_seek;
gst_event_replace (event_p, NULL);
event_p = &basesrc->priv->close_segment;