diff -r 9b2c3c7a1a9c -r 567bb019e3e3 gstreamer_core/libs/gst/base/gstbasesrc.c --- a/gstreamer_core/libs/gst/base/gstbasesrc.c Wed Mar 31 22:03:18 2010 +0300 +++ b/gstreamer_core/libs/gst/base/gstbasesrc.c Tue Aug 31 15:30:33 2010 +0300 @@ -33,14 +33,11 @@ * live sources * * - * - * * The source can be configured to operate in any #GstFormat with the * gst_base_src_set_format() method. The currently set format determines * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES. - * - * + * * #GstBaseSrc always supports push mode scheduling. If the following * conditions are met, it also supports pull mode scheduling: * @@ -49,12 +46,10 @@ * #GstBaseSrc::is_seekable returns %TRUE. * * - * - * + * * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. - * - * + * * If all the conditions are met for operating in pull mode, #GstBaseSrc is * automatically seekable in push mode as well. The following conditions must * be met to make the element seekable in push mode when the format is not @@ -71,40 +66,34 @@ * #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE. * * - * - * + * * When the element does not meet the requirements to operate in pull mode, * the offset and length in the #GstBaseSrc::create method should be ignored. * It is recommended to subclass #GstPushSrc instead, in this situation. If the * element can operate in pull mode but only with specific offsets and * lengths, it is allowed to generate an error when the wrong values are passed * to the #GstBaseSrc::create function. - * - * + * * #GstBaseSrc has support for live sources. Live sources are sources that when * paused discard data, such as audio or video capture devices. A typical live * source also produces data at a fixed rate and thus provides a clock to publish * this rate. * Use gst_base_src_set_live() to activate the live source mode. - * - * + * * A live source does not produce data in the PAUSED state. This means that the * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING. * To signal the pipeline that the element will not produce data, the return * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL. - * - * + * * A typical live source will timestamp the buffers it creates with the * current running time of the pipeline. This is one reason why a live source * can only produce data in the PLAYING state, when the clock is actually * distributed and running. - * - * + * * Live sources that synchronize and block on the clock (an audio source, for * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create * function was interrupted by a state change to PAUSED. - * - * + * * The #GstBaseSrc::get_times method can be used to implement pseudo-live * sources. * It only makes sense to implement the ::get_times function if the source is @@ -113,21 +102,17 @@ * the timestamps are transformed into the current running_time. * The base source will then wait for the calculated running_time before pushing * out the buffer. - * - * + * * For live sources, the base class will by default report a latency of 0. * For pseudo live sources, the base class will by default measure the difference * between the first buffer timestamp and the start time of get_times and will * report this value as the latency. * Subclasses should override the query function when this behaviour is not * acceptable. - * - * + * * There is only support in #GstBaseSrc for exactly one source pad, which * should be named "src". A source implementation (subclass of #GstBaseSrc) * should install a pad template in its class_init function, like so: - * - * * * static void * my_element_class_init (GstMyElementClass *klass) @@ -141,7 +126,8 @@ * gst_element_class_set_details (gstelement_class, &details); * } * - * + * + * * Controlled shutdown of live sources in applications * * Applications that record from a live source may want to stop recording @@ -152,22 +138,19 @@ * event down the pipeline. The application would then wait for an * EOS message posted on the pipeline's bus to know when all data has * been processed and the pipeline can safely be stopped. - * - * + * * Since GStreamer 0.10.16 an application may send an EOS event to a source - * element to make it send an EOS event downstream. This can typically be done + * element to make it perform the EOS logic (send EOS event downstream or post a + * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done * with the gst_element_send_event() function on the element or its parent bin. - * - * + * * After the EOS has been sent to the element, the application should wait for * an EOS message to be posted on the pipeline's bus. Once this EOS message is * received, it may safely shut down the entire pipeline. - * - * + * * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3 * is still available but deprecated as it is dangerous and less flexible. - * - * + * * Last reviewed on 2007-12-19 (0.10.16) * * @@ -243,8 +226,8 @@ GstEvent *close_segment; GstEvent *start_segment; - /* if EOS is pending */ - gboolean pending_eos; + /* if EOS is pending (atomic) */ + gint pending_eos; /* startup latency is the time it takes between going to PLAYING and producing * the first BUFFER with running_time 0. This value is included in the latency @@ -255,6 +238,12 @@ GstClockTimeDiff ts_offset; gboolean do_timestamp; + + /* stream sequence number */ + guint32 seqnum; + + /* pending tags to be pushed in the data stream */ + GList *pending_tags; }; static GstElementClass *parent_class = NULL; @@ -272,9 +261,10 @@ GType gst_base_src_get_type (void) { - static GType base_src_type = 0; - - if (G_UNLIKELY (base_src_type == 0)) { + static volatile gsize base_src_type = 0; + + if (g_once_init_enter (&base_src_type)) { + GType _type; static const GTypeInfo base_src_info = { sizeof (GstBaseSrcClass), (GBaseInitFunc) gst_base_src_base_init, @@ -287,11 +277,13 @@ (GInstanceInitFunc) gst_base_src_init, }; - base_src_type = g_type_register_static (GST_TYPE_ELEMENT, + _type = g_type_register_static (GST_TYPE_ELEMENT, "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT); + g_once_init_leave (&base_src_type, _type); } return base_src_type; } + static GstCaps *gst_base_src_getcaps (GstPad * pad); static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps); static void gst_base_src_fixate (GstPad * pad, GstCaps * caps); @@ -331,6 +323,7 @@ guint length, GstBuffer ** buf); static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length, GstBuffer ** buf); +static gboolean gst_base_src_seekable (GstBaseSrc * src); static void gst_base_src_base_init (gpointer g_class) @@ -357,20 +350,21 @@ g_object_class_install_property (gobject_class, PROP_BLOCKSIZE, g_param_spec_ulong ("blocksize", "Block size", - "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG, - DEFAULT_BLOCKSIZE, G_PARAM_READWRITE)); + "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG, + DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS, g_param_spec_int ("num-buffers", "num-buffers", - "Number of buffers to output before sending EOS", -1, G_MAXINT, - DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE)); + "Number of buffers to output before sending EOS (-1 = unlimited)", + -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_TYPEFIND, g_param_spec_boolean ("typefind", "Typefind", "Run typefind before negotiating", DEFAULT_TYPEFIND, - G_PARAM_READWRITE)); + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP, g_param_spec_boolean ("do-timestamp", "Do timestamp", "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP, - G_PARAM_READWRITE)); + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_base_src_change_state); @@ -460,6 +454,11 @@ event_p = &basesrc->data.ABI.pending_seek; gst_event_replace (event_p, NULL); + if (basesrc->priv->pending_tags) { + g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL); + g_list_free (basesrc->priv->pending_tags); + } + G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -488,6 +487,8 @@ GstFlowReturn gst_base_src_wait_playing (GstBaseSrc * src) { + g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR); + /* block until the state changes, or we get a flush, or something */ GST_DEBUG_OBJECT (src, "live source waiting for running state"); GST_LIVE_WAIT (src); @@ -526,6 +527,8 @@ void gst_base_src_set_live (GstBaseSrc * src, gboolean live) { + g_return_if_fail (GST_IS_BASE_SRC (src)); + GST_OBJECT_LOCK (src); src->is_live = live; GST_OBJECT_UNLOCK (src); @@ -548,6 +551,8 @@ { gboolean result; + g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE); + GST_OBJECT_LOCK (src); result = src->is_live; GST_OBJECT_UNLOCK (src); @@ -575,6 +580,8 @@ void gst_base_src_set_format (GstBaseSrc * src, GstFormat format) { + g_return_if_fail (GST_IS_BASE_SRC (src)); + gst_segment_init (&src->segment, format); } @@ -606,6 +613,8 @@ { GstClockTime min; + g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE); + GST_OBJECT_LOCK (src); if (live) *live = src->is_live; @@ -632,6 +641,59 @@ } /** + * gst_base_src_set_blocksize: + * @src: the source + * @blocksize: the new blocksize in bytes + * + * Set the number of bytes that @src will push out with each buffer. When + * @blocksize is set to -1, a default length will be used. + * + * Since: 0.10.22 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +void +gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize) +{ + g_return_if_fail (GST_IS_BASE_SRC (src)); + + GST_OBJECT_LOCK (src); + src->blocksize = blocksize; + GST_OBJECT_UNLOCK (src); +} + +/** + * gst_base_src_get_blocksize: + * @src: the source + * + * Get the number of bytes that @src will push out with each buffer. + * + * Returns: the number of bytes pushed with each buffer. + * + * Since: 0.10.22 + */ +#ifdef __SYMBIAN32__ +EXPORT_C +#endif + +gulong +gst_base_src_get_blocksize (GstBaseSrc * src) +{ + gulong res; + + g_return_val_if_fail (GST_IS_BASE_SRC (src), 0); + + GST_OBJECT_LOCK (src); + res = src->blocksize; + GST_OBJECT_UNLOCK (src); + + return res; +} + + +/** * gst_base_src_set_do_timestamp: * @src: the source * @timestamp: enable or disable timestamping @@ -649,6 +711,8 @@ void gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp) { + g_return_if_fail (GST_IS_BASE_SRC (src)); + GST_OBJECT_LOCK (src); src->priv->do_timestamp = timestamp; GST_OBJECT_UNLOCK (src); @@ -673,6 +737,8 @@ { gboolean res; + g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE); + GST_OBJECT_LOCK (src); res = src->priv->do_timestamp; GST_OBJECT_UNLOCK (src); @@ -797,6 +863,7 @@ GST_DEBUG_OBJECT (src, "duration query in format %s", gst_format_get_name (format)); + switch (format) { case GST_FORMAT_PERCENT: gst_query_set_duration (query, GST_FORMAT_PERCENT, @@ -832,9 +899,20 @@ case GST_QUERY_SEEKING: { - gst_query_set_seeking (query, src->segment.format, - src->seekable, 0, src->segment.duration); - res = TRUE; + GstFormat format; + + gst_query_parse_seeking (query, &format, NULL, NULL, NULL); + if (format == src->segment.format) { + gst_query_set_seeking (query, src->segment.format, + gst_base_src_seekable (src), 0, src->segment.duration); + res = TRUE; + } else { + /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */ + /* Don't reply to the query to make up for demuxers which don't + * handle the SEEKING query yet. Players like Totem will fall back + * to the duration when the SEEKING query isn't answered. */ + res = FALSE; + } break; } case GST_QUERY_SEGMENT: @@ -899,6 +977,46 @@ } case GST_QUERY_JITTER: case GST_QUERY_RATE: + res = FALSE; + break; + case GST_QUERY_BUFFERING: + { + GstFormat format; + gint64 start, stop, estimated; + + gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL); + + GST_DEBUG_OBJECT (src, "buffering query in format %s", + gst_format_get_name (format)); + + if (src->random_access) { + estimated = 0; + start = 0; + if (format == GST_FORMAT_PERCENT) + stop = GST_FORMAT_PERCENT_MAX; + else + stop = src->segment.duration; + } else { + estimated = -1; + start = -1; + stop = -1; + } + /* convert to required format. When the conversion fails, we can't answer + * the query. When the value is unknown, we can don't perform conversion + * but report TRUE. */ + if (format != GST_FORMAT_PERCENT && stop != -1) { + res = gst_pad_query_convert (src->srcpad, src->segment.format, + stop, &format, &stop); + } else { + res = TRUE; + } + if (res && format != GST_FORMAT_PERCENT && start != -1) + res = gst_pad_query_convert (src->srcpad, src->segment.format, + start, &format, &start); + + gst_query_set_buffering_range (query, format, start, stop, estimated); + break; + } default: res = FALSE; break; @@ -940,9 +1058,10 @@ } else if (segment->start == 0) { /* seek to start, we can implement a default for this. */ segment->time = 0; - res = TRUE; - } else + } else { res = FALSE; + GST_INFO_OBJECT (src, "Can't do a default seek"); + } return res; } @@ -1096,7 +1215,7 @@ static gboolean gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock) { - gboolean res = TRUE; + gboolean res = TRUE, tres; gdouble rate; GstFormat seek_format, dest_format; GstSeekFlags flags; @@ -1107,6 +1226,8 @@ gboolean relative_seek = FALSE; gboolean seekseg_configured = FALSE; GstSegment seeksegment; + guint32 seqnum; + GstEvent *tevent; GST_DEBUG_OBJECT (src, "doing seek"); @@ -1122,7 +1243,8 @@ if (dest_format != seek_format && !relative_seek) { /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it * here before taking the stream lock, otherwise we must convert it later, - * once we have the stream lock and can read the current position */ + * once we have the stream lock and can read the last configures segment + * start and stop positions */ gst_segment_init (&seeksegment, dest_format); if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) @@ -1132,14 +1254,19 @@ } flush = flags & GST_SEEK_FLAG_FLUSH; + seqnum = gst_event_get_seqnum (event); } else { flush = FALSE; + /* get next seqnum */ + seqnum = gst_util_seqnum_next (); } /* send flush start */ - if (flush) - gst_pad_push_event (src->srcpad, gst_event_new_flush_start ()); - else + if (flush) { + tevent = gst_event_new_flush_start (); + gst_event_set_seqnum (tevent, seqnum); + gst_pad_push_event (src->srcpad, tevent); + } else gst_pad_pause_task (src->srcpad); /* unblock streaming thread. */ @@ -1149,6 +1276,14 @@ * because the task is paused, our streaming thread stopped * or because our peer is flushing. */ GST_PAD_STREAM_LOCK (src->srcpad); + if (G_UNLIKELY (src->priv->seqnum == seqnum)) { + /* we have seen this event before, issue a warning for now */ + GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT, + seqnum); + } else { + src->priv->seqnum = seqnum; + GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum); + } gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL); @@ -1191,9 +1326,11 @@ /* and prepare to continue streaming */ if (flush) { + tevent = gst_event_new_flush_stop (); + gst_event_set_seqnum (tevent, seqnum); /* send flush stop, peer will accept data and events again. We * are not yet providing data as we still have the STREAM_LOCK. */ - gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ()); + gst_pad_push_event (src->srcpad, tevent); } else if (res && src->data.ABI.running) { /* we are running the current segment and doing a non-flushing seek, * close the segment first based on the last_stop. */ @@ -1207,6 +1344,7 @@ gst_event_new_new_segment_full (TRUE, src->segment.rate, src->segment.applied_rate, src->segment.format, src->segment.start, src->segment.last_stop, src->segment.time); + gst_event_set_seqnum (src->priv->close_segment, seqnum); } /* The subclass must have converted the segment to the processing format @@ -1223,9 +1361,13 @@ memcpy (&src->segment, &seeksegment, sizeof (GstSegment)); if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { - gst_element_post_message (GST_ELEMENT (src), - gst_message_new_segment_start (GST_OBJECT (src), - src->segment.format, src->segment.last_stop)); + GstMessage *message; + + message = gst_message_new_segment_start (GST_OBJECT (src), + src->segment.format, src->segment.last_stop); + gst_message_set_seqnum (message, seqnum); + + gst_element_post_message (GST_ELEMENT (src), message); } /* for deriving a stop position for the playback segment from the seek @@ -1247,20 +1389,23 @@ src->segment.rate, src->segment.applied_rate, src->segment.format, src->segment.last_stop, stop, src->segment.time); } else { - /* reverse, we send data from stop to last_stop */ + /* reverse, we send data from last_stop to start */ src->priv->start_segment = gst_event_new_new_segment_full (FALSE, src->segment.rate, src->segment.applied_rate, src->segment.format, src->segment.start, src->segment.last_stop, src->segment.time); } + gst_event_set_seqnum (src->priv->start_segment, seqnum); } src->priv->discont = TRUE; src->data.ABI.running = TRUE; /* and restart the task in case it got paused explicitely or by * the FLUSH_START event we pushed out. */ - gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, + tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop, src->srcpad); + if (res && !tres) + res = FALSE; /* and release the lock again so we can continue streaming */ GST_PAD_STREAM_UNLOCK (src->srcpad); @@ -1304,6 +1449,8 @@ src = GST_BASE_SRC (element); + GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event)); + switch (GST_EVENT_TYPE (event)) { /* bidirectional events */ case GST_EVENT_FLUSH_START: @@ -1314,18 +1461,56 @@ /* downstream serialized events */ case GST_EVENT_EOS: - /* queue EOS and make sure the task or pull function - * performs the EOS actions. */ + { + GstBaseSrcClass *bclass; + + bclass = GST_BASE_SRC_GET_CLASS (src); + + /* queue EOS and make sure the task or pull function performs the EOS + * actions. + * + * We have two possibilities: + * + * - Before we are to enter the _create function, we check the pending_eos + * first and do EOS instead of entering it. + * - If we are in the _create function or we did not manage to set the + * flag fast enough and we are about to enter the _create function, + * we unlock it so that we exit with WRONG_STATE immediatly. We then + * check the EOS flag and do the EOS logic. + */ + g_atomic_int_set (&src->priv->pending_eos, TRUE); + GST_DEBUG_OBJECT (src, "EOS marked, calling unlock"); + + /* unlock the _create function so that we can check the pending_eos flag + * and we can do EOS. This will eventually release the LIVE_LOCK again so + * that we can grab it and stop the unlock again. We don't take the stream + * lock so that this operation is guaranteed to never block. */ + if (bclass->unlock) + bclass->unlock (src); + + GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK"); + GST_LIVE_LOCK (src); - src->priv->pending_eos = TRUE; + GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop"); + /* now stop the unlock of the streaming thread again. Grabbing the live + * lock is enough because that protects the create function. */ + if (bclass->unlock_stop) + bclass->unlock_stop (src); GST_LIVE_UNLOCK (src); + result = TRUE; break; + } case GST_EVENT_NEWSEGMENT: /* sending random NEWSEGMENT downstream can break sync. */ break; case GST_EVENT_TAG: - /* sending tags could be useful, FIXME insert in dataflow */ + /* Insert tag in the dataflow */ + GST_OBJECT_LOCK (src); + src->priv->pending_tags = g_list_append (src->priv->pending_tags, event); + GST_OBJECT_UNLOCK (src); + event = NULL; + result = TRUE; break; case GST_EVENT_BUFFERSIZE: /* does not seem to make much sense currently */ @@ -1346,6 +1531,7 @@ GST_OBJECT_UNLOCK (src->srcpad); if (started) { + GST_DEBUG_OBJECT (src, "performing seek"); /* when we are running in push mode, we can execute the * seek right now, we need to unlock. */ result = gst_base_src_perform_seek (src, event, TRUE); @@ -1355,6 +1541,7 @@ /* else we store the event and execute the seek when we * get activated */ GST_OBJECT_LOCK (src); + GST_DEBUG_OBJECT (src, "queueing seek"); event_p = &src->data.ABI.pending_seek; gst_event_replace ((GstEvent **) event_p, event); GST_OBJECT_UNLOCK (src); @@ -1408,6 +1595,17 @@ } static gboolean +gst_base_src_seekable (GstBaseSrc * src) +{ + GstBaseSrcClass *bclass; + bclass = GST_BASE_SRC_GET_CLASS (src); + if (bclass->is_seekable) + return bclass->is_seekable (src); + else + return FALSE; +} + +static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event) { gboolean result; @@ -1415,7 +1613,7 @@ switch (GST_EVENT_TYPE (event)) { case GST_EVENT_SEEK: /* is normally called when in push mode */ - if (!src->seekable) + if (!gst_base_src_seekable (src)) goto not_seekable; result = gst_base_src_perform_seek (src, event, TRUE); @@ -1481,7 +1679,7 @@ switch (prop_id) { case PROP_BLOCKSIZE: - src->blocksize = g_value_get_ulong (value); + gst_base_src_set_blocksize (src, g_value_get_ulong (value)); break; case PROP_NUM_BUFFERS: src->num_buffers = g_value_get_int (value); @@ -1490,7 +1688,7 @@ src->data.ABI.typefind = g_value_get_boolean (value); break; case PROP_DO_TIMESTAMP: - src->priv->do_timestamp = g_value_get_boolean (value); + gst_base_src_set_do_timestamp (src, g_value_get_boolean (value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1508,7 +1706,7 @@ switch (prop_id) { case PROP_BLOCKSIZE: - g_value_set_ulong (value, src->blocksize); + g_value_set_ulong (value, gst_base_src_get_blocksize (src)); break; case PROP_NUM_BUFFERS: g_value_set_int (value, src->num_buffers); @@ -1517,7 +1715,7 @@ g_value_set_boolean (value, src->data.ABI.typefind); break; case PROP_DO_TIMESTAMP: - g_value_set_boolean (value, src->priv->do_timestamp); + g_value_set_boolean (value, gst_base_src_get_do_timestamp (src)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1810,11 +2008,28 @@ src->num_buffers_left--; } + /* don't enter the create function if a pending EOS event was set. For the + * logic of the pending_eos, check the event function of this class. */ + if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) + goto eos; + GST_DEBUG_OBJECT (src, "calling create offset %" G_GUINT64_FORMAT " length %u, time %" G_GINT64_FORMAT, offset, length, src->segment.time); ret = bclass->create (src, offset, length, buf); + + /* The create function could be unlocked because we have a pending EOS. It's + * possible that we have a valid buffer from create that we need to + * discard when the create function returned _OK. */ + if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) { + if (ret == GST_FLOW_OK) { + gst_buffer_unref (*buf); + *buf = NULL; + } + goto eos; + } + if (G_UNLIKELY (ret != GST_FLOW_OK)) goto not_ok; @@ -1823,6 +2038,10 @@ && GST_BUFFER_TIMESTAMP (*buf) == -1) GST_BUFFER_TIMESTAMP (*buf) = 0; + /* set pad caps on the buffer if the buffer had no caps */ + if (GST_BUFFER_CAPS (*buf) == NULL) + gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad)); + /* now sync before pushing the buffer */ status = gst_base_src_do_sync (src, *buf); @@ -1830,9 +2049,6 @@ if (G_UNLIKELY (src->priv->flushing)) goto flushing; - if (G_UNLIKELY (src->priv->pending_eos)) - goto eos; - switch (status) { case GST_CLOCK_EARLY: /* the buffer is too late. We currently don't drop the buffer. */ @@ -1909,8 +2125,6 @@ eos: { GST_DEBUG_OBJECT (src, "we are EOS"); - gst_buffer_unref (*buf); - *buf = NULL; return GST_FLOW_UNEXPECTED; } } @@ -1928,10 +2142,6 @@ if (G_UNLIKELY (src->priv->flushing)) goto flushing; - /* if we're EOS, return right away */ - if (G_UNLIKELY (src->priv->pending_eos)) - goto eos; - res = gst_base_src_get_range (src, offset, length, buf); done: @@ -1948,12 +2158,6 @@ res = GST_FLOW_WRONG_STATE; goto done; } -eos: - { - GST_DEBUG_OBJECT (src, "we are EOS"); - res = GST_FLOW_UNEXPECTED; - goto done; - } } static gboolean @@ -2022,19 +2226,17 @@ gint64 position; gboolean eos; gulong blocksize; + GList *tags, *tmp; eos = FALSE; src = GST_BASE_SRC (GST_OBJECT_PARENT (pad)); GST_LIVE_LOCK (src); + if (G_UNLIKELY (src->priv->flushing)) goto flushing; - /* if we're EOS, return right away */ - if (G_UNLIKELY (src->priv->pending_eos)) - goto eos; - src->priv->last_sent_eos = FALSE; blocksize = src->blocksize; @@ -2056,6 +2258,9 @@ } else position = -1; + GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu", + GST_TIME_ARGS (position), blocksize); + ret = gst_base_src_get_range (src, position, blocksize, &buf); if (G_UNLIKELY (ret != GST_FLOW_OK)) { GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s", @@ -2077,6 +2282,21 @@ src->priv->start_segment = NULL; } + GST_OBJECT_LOCK (src); + /* take the tags */ + tags = src->priv->pending_tags; + src->priv->pending_tags = NULL; + GST_OBJECT_UNLOCK (src); + + /* Push out pending tags if any */ + if (G_UNLIKELY (tags != NULL)) { + for (tmp = tags; tmp; tmp = g_list_next (tmp)) { + GstEvent *ev = (GstEvent *) tmp->data; + gst_pad_push_event (pad, ev); + } + g_list_free (tags); + } + /* figure out the new position */ switch (src->segment.format) { case GST_FORMAT_BYTES: @@ -2173,16 +2393,10 @@ ret = GST_FLOW_WRONG_STATE; goto pause; } -eos: - { - GST_DEBUG_OBJECT (src, "we are EOS"); - GST_LIVE_UNLOCK (src); - ret = GST_FLOW_UNEXPECTED; - goto pause; - } pause: { const gchar *reason = gst_flow_get_name (ret); + GstEvent *event; GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason); src->data.ABI.running = FALSE; @@ -2191,20 +2405,27 @@ if (ret == GST_FLOW_UNEXPECTED) { /* perform EOS logic */ if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) { - gst_element_post_message (GST_ELEMENT_CAST (src), - gst_message_new_segment_done (GST_OBJECT_CAST (src), - src->segment.format, src->segment.last_stop)); + GstMessage *message; + + message = gst_message_new_segment_done (GST_OBJECT_CAST (src), + src->segment.format, src->segment.last_stop); + gst_message_set_seqnum (message, src->priv->seqnum); + gst_element_post_message (GST_ELEMENT_CAST (src), message); } else { - gst_pad_push_event (pad, gst_event_new_eos ()); + event = gst_event_new_eos (); + gst_event_set_seqnum (event, src->priv->seqnum); + gst_pad_push_event (pad, event); src->priv->last_sent_eos = TRUE; } } else { + event = gst_event_new_eos (); + gst_event_set_seqnum (event, src->priv->seqnum); /* for fatal errors we post an error message, post the error * first so the app knows about the error first. */ GST_ELEMENT_ERROR (src, STREAM, FAILED, (_("Internal data flow error.")), ("streaming task paused, reason %s (%d)", reason, ret)); - gst_pad_push_event (pad, gst_event_new_eos ()); + gst_pad_push_event (pad, event); src->priv->last_sent_eos = TRUE; } } @@ -2241,6 +2462,9 @@ if (thiscaps == NULL || gst_caps_is_any (thiscaps)) goto no_nego_needed; + if (G_UNLIKELY (gst_caps_is_empty (thiscaps))) + goto no_caps; + /* get the peer caps */ peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc)); GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps); @@ -2275,12 +2499,14 @@ * nego is not needed */ result = TRUE; } else if (gst_caps_is_fixed (caps)) { - /* yay, fixed caps, use those then */ - gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps); - result = TRUE; + /* yay, fixed caps, use those then, it's possible that the subclass does + * not accept this caps after all and we have to fail. */ + result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps); } } gst_caps_unref (caps); + } else { + GST_DEBUG_OBJECT (basesrc, "no common caps"); } return result; @@ -2291,6 +2517,15 @@ gst_caps_unref (thiscaps); return TRUE; } +no_caps: + { + GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT, + ("No supported formats found"), + ("This element did not produce valid caps")); + if (thiscaps) + gst_caps_unref (thiscaps); + return TRUE; + } } static gboolean @@ -2313,6 +2548,7 @@ GstBaseSrcClass *bclass; gboolean result; guint64 size; + gboolean seekable; if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED)) return TRUE; @@ -2357,16 +2593,11 @@ G_GINT64_FORMAT, basesrc->segment.format, result, size, basesrc->segment.duration); - /* check if we can seek */ - if (bclass->is_seekable) - basesrc->seekable = bclass->is_seekable (basesrc); - else - basesrc->seekable = FALSE; - - GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable); + seekable = gst_base_src_seekable (basesrc); + GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable); /* update for random access flag */ - basesrc->random_access = basesrc->seekable && + basesrc->random_access = seekable && basesrc->segment.format == GST_FORMAT_BYTES; GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access); @@ -2375,7 +2606,9 @@ if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) { GstCaps *caps; - caps = gst_type_find_helper (basesrc->srcpad, size); + if (!(caps = gst_type_find_helper (basesrc->srcpad, size))) + goto typefind_failed; + gst_pad_set_caps (basesrc->srcpad, caps); gst_caps_unref (caps); } else { @@ -2402,6 +2635,14 @@ gst_base_src_stop (basesrc); return FALSE; } +typefind_failed: + { + GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping"); + GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL)); + /* we must call stop */ + gst_base_src_stop (basesrc); + return FALSE; + } } static gboolean @@ -2452,8 +2693,9 @@ if (flushing) { /* if we are locked in the live lock, signal it to make it flush */ basesrc->live_running = TRUE; + /* clear pending EOS if any */ - basesrc->priv->pending_eos = FALSE; + g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); /* step 1, now that we have the LIVE lock, clear our unlock request */ if (bclass->unlock_stop) @@ -2696,7 +2938,7 @@ break; case GST_STATE_CHANGE_PAUSED_TO_READY: { - GstEvent **event_p; + GstEvent **event_p, *event; /* we don't need to unblock anything here, the pad deactivation code * already did this */ @@ -2706,10 +2948,12 @@ * the EOS event to the element */ if (!basesrc->priv->last_sent_eos) { GST_DEBUG_OBJECT (basesrc, "Sending EOS event"); - gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ()); + event = gst_event_new_eos (); + gst_event_set_seqnum (event, basesrc->priv->seqnum); + gst_pad_push_event (basesrc->srcpad, event); basesrc->priv->last_sent_eos = TRUE; } - basesrc->priv->pending_eos = FALSE; + g_atomic_int_set (&basesrc->priv->pending_eos, FALSE); event_p = &basesrc->data.ABI.pending_seek; gst_event_replace (event_p, NULL); event_p = &basesrc->priv->close_segment;