diff -r 71e347f905f2 -r 4a7fac7dd34a gst_plugins_base/gst/playback/gstqueue2.c --- a/gst_plugins_base/gst/playback/gstqueue2.c Fri Mar 19 09:35:09 2010 +0200 +++ b/gst_plugins_base/gst/playback/gstqueue2.c Fri Apr 16 15:15:52 2010 +0300 @@ -27,8 +27,8 @@ * @short_description: Asynchronous data queue. * * Data is queued until one of the limits specified by the - * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or - * #GstQueue:max-size-time properties has been reached. Any attempt to push + * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or + * #GstQueue2:max-size-time properties has been reached. Any attempt to push * more buffers into the queue will block the pushing thread until more space * becomes available. * @@ -36,16 +36,22 @@ * processing on sink and source pad. * * You can query how many buffers are queued by reading the - * #GstQueue:current-level-buffers property. + * #GstQueue2:current-level-buffers property. * * The default queue size limits are 100 buffers, 2MB of data, or * two seconds worth of data, whichever is reached first. * - * If you set temp-location, the element will buffer data on the file - * specified by it. By using this, it will buffer the entire - * stream data on the file independently of the queue size limits, they - * will only be used for buffering statistics. + * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element + * will allocate a random free filename and buffer data in the file. + * By using this, it will buffer the entire stream data on the file independently + * of the queue size limits, they will only be used for buffering statistics. * + * Since 0.10.24, setting the temp-location property with a filename is deprecated + * because it's impossible to securely open a temporary file in this way. The + * property will still be used to notify the application of the allocated + * filename, though. + * + * Last reviewed on 2009-07-10 (0.10.24) */ #ifdef HAVE_CONFIG_H @@ -57,9 +63,16 @@ #include #include -#ifdef __SYMBIAN32__ -#include +#ifdef G_OS_WIN32 +#include /* lseek, open, close, read */ +#undef lseek +#define lseek _lseeki64 +#undef off_t +#define off_t guint64 +#else +#include #endif + static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", "Generic", "Simple data queue", @@ -96,7 +109,7 @@ /* other defines */ #define DEFAULT_BUFFER_SIZE 4096 -#define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL) +#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL) enum { @@ -111,6 +124,7 @@ PROP_USE_RATE_ESTIMATE, PROP_LOW_PERCENT, PROP_HIGH_PERCENT, + PROP_TEMP_TEMPLATE, PROP_TEMP_LOCATION }; @@ -199,10 +213,13 @@ GCond *item_del; /* signals space now available for writing */ /* temp location stuff */ + gchar *temp_template; + gboolean temp_location_set; gchar *temp_location; FILE *temp_file; guint64 writing_pos; guint64 reading_pos; + guint64 max_reading_pos; /* we need this to send the first new segment event of the stream * because we can't save it on the file */ gboolean segment_event_received; @@ -228,7 +245,7 @@ queue->cur_level.time, \ queue->max_level.time, \ (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ - queue->writing_pos - queue->reading_pos : \ + queue->writing_pos - queue->max_reading_pos : \ queue->queue->length)) #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ @@ -367,50 +384,60 @@ g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, g_param_spec_uint ("current-level-bytes", "Current level (kB)", "Current amount of data in the queue (bytes)", - 0, G_MAXUINT, 0, G_PARAM_READABLE)); + 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS, g_param_spec_uint ("current-level-buffers", "Current level (buffers)", "Current number of buffers in the queue", - 0, G_MAXUINT, 0, G_PARAM_READABLE)); + 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME, g_param_spec_uint64 ("current-level-time", "Current level (ns)", "Current amount of data in the queue (in ns)", - 0, G_MAXUINT64, 0, G_PARAM_READABLE)); + 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, g_param_spec_uint ("max-size-bytes", "Max. size (kB)", "Max. amount of data in the queue (bytes, 0=disable)", - 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); + 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS, g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", - "Max. number of buffers in the queue (0=disable)", - 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); + "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT, + DEFAULT_MAX_SIZE_BUFFERS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, g_param_spec_uint64 ("max-size-time", "Max. size (ns)", - "Max. amount of data in the queue (in ns, 0=disable)", - 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); + "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64, + DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USE_BUFFERING, g_param_spec_boolean ("use-buffering", "Use buffering", "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", - DEFAULT_USE_BUFFERING, G_PARAM_READWRITE)); + DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", "Estimate the bitrate of the stream to calculate time level", - DEFAULT_USE_RATE_ESTIMATE, G_PARAM_READWRITE)); + DEFAULT_USE_RATE_ESTIMATE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, g_param_spec_int ("low-percent", "Low percent", - "Low threshold for buffering to start", - 0, 100, DEFAULT_LOW_PERCENT, G_PARAM_READWRITE)); + "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, g_param_spec_int ("high-percent", "High percent", - "High threshold for buffering to finish", - 0, 100, DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE)); + "High threshold for buffering to finish", 0, 100, + DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE, + g_param_spec_string ("temp-template", "Temporary File Template", + "File template to store temporary files in, should contain directory " + "and XXXXXX. (NULL == disabled)", + NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION, g_param_spec_string ("temp-location", "Temporary File Location", - "Location of a temporary file to store data in", - NULL, G_PARAM_READWRITE)); + "Location to store temporary files in (Deprecated: Only read this " + "property, use temp-tmpl to configure the name template)", + NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_pad_template (gstelement_class, gst_static_pad_template_get (&srctemplate)); @@ -487,8 +514,9 @@ queue->queue = g_queue_new (); /* tempfile related */ + queue->temp_template = NULL; queue->temp_location = NULL; - queue->temp_file = NULL; + queue->temp_location_set = FALSE; GST_DEBUG_OBJECT (queue, "initialized queue's not_empty & not_full conditions"); @@ -516,8 +544,8 @@ g_timer_destroy (queue->out_timer); /* temp_file path cleanup */ - if (queue->temp_location != NULL) - g_free (queue->temp_location); + g_free (queue->temp_template); + g_free (queue->temp_location); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -689,15 +717,27 @@ } } if (post) { + GstMessage *message; + GstBufferingMode mode; + /* scale to high percent so that it becomes the 100% mark */ percent = percent * 100 / queue->high_percent; /* clip */ if (percent > 100) percent = 100; + if (QUEUE_IS_USING_TEMP_FILE (queue)) + mode = GST_BUFFERING_DOWNLOAD; + else + mode = GST_BUFFERING_STREAM; + GST_DEBUG_OBJECT (queue, "buffering %d percent", percent); - gst_element_post_message (GST_ELEMENT_CAST (queue), - gst_message_new_buffering (GST_OBJECT_CAST (queue), percent)); + message = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent); + gst_message_set_buffering_stats (message, mode, + queue->byte_in_rate, queue->byte_out_rate, -1); + + gst_element_post_message (GST_ELEMENT_CAST (queue), message); + } else { GST_DEBUG_OBJECT (queue, "filled %d percent", percent); } @@ -800,6 +840,10 @@ queue->last_out_elapsed = elapsed; queue->bytes_out = 0; } + if (queue->byte_in_rate > 0.0) { + queue->cur_level.rate_time = + queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND; + } GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT, queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); } @@ -811,7 +855,13 @@ guint8 *data; int ret; +#ifdef HAVE_FSEEKO + fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET); +#elif defined (G_OS_UNIX) || defined (G_OS_WIN32) + lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET); +#else fseek (queue->temp_file, queue->writing_pos, SEEK_SET); +#endif data = GST_BUFFER_DATA (buffer); size = GST_BUFFER_SIZE (buffer); @@ -822,6 +872,11 @@ GST_ERROR_OBJECT (queue, "fwrite returned error"); } queue->writing_pos += size; + + if (queue->writing_pos > queue->max_reading_pos) + queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; + else + queue->cur_level.bytes = 0; } /* see if there is enough data in the file to read a full buffer */ @@ -856,7 +911,7 @@ #ifdef HAVE_FSEEKO if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0) goto seek_failed; -#elif defined (G_OS_UNIX) +#elif defined (G_OS_UNIX) || defined (G_OS_WIN32) if (lseek (fileno (queue->temp_file), (off_t) offset, SEEK_SET) == (off_t) - 1) goto seek_failed; @@ -889,6 +944,12 @@ *buffer = buf; queue->reading_pos = offset + length; + queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos); + + if (queue->writing_pos > queue->max_reading_pos) + queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; + else + queue->cur_level.bytes = 0; return GST_FLOW_OK; @@ -951,33 +1012,74 @@ static gboolean gst_queue_open_temp_location_file (GstQueue * queue) { - /* nothing to do */ - if (queue->temp_location == NULL) - goto no_filename; + gint fd = -1; + gchar *name = NULL; + + GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template); + + /* we have two cases: + * - temp_location was set to something !NULL (Deprecated). in this case we + * open the specified filename. + * - temp_template was set, allocate a filename and open that filename + */ + if (!queue->temp_location_set) { + /* nothing to do */ + if (queue->temp_template == NULL) + goto no_directory; - /* open the file for update/writing */ - queue->temp_file = g_fopen (queue->temp_location, "wb+"); - /* error creating file */ - if (queue->temp_file == NULL) - goto open_failed; + /* make copy of the template, we don't want to change this */ + name = g_strdup (queue->temp_template); + fd = g_mkstemp (name); + if (fd == -1) + goto mkstemp_failed; + + /* open the file for update/writing */ + queue->temp_file = fdopen (fd, "wb+"); + /* error creating file */ + if (queue->temp_file == NULL) + goto open_failed; + + g_free (queue->temp_location); + queue->temp_location = name; + + g_object_notify (G_OBJECT (queue), "temp-location"); + } else { + /* open the file for update/writing, this is deprecated but we still need to + * support it for API/ABI compatibility */ + queue->temp_file = g_fopen (queue->temp_location, "wb+"); + /* error creating file */ + if (queue->temp_file == NULL) + goto open_failed; + } queue->writing_pos = 0; queue->reading_pos = 0; + queue->max_reading_pos = 0; return TRUE; /* ERRORS */ -no_filename: +no_directory: { GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, - (_("No file name specified.")), (NULL)); + (_("No Temp directory specified.")), (NULL)); + return FALSE; + } +mkstemp_failed: + { + GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, + (_("Could not create temp file \"%s\"."), queue->temp_template), + GST_ERROR_SYSTEM); + g_free (name); return FALSE; } open_failed: { GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, - (_("Could not open file \"%s\" for reading."), queue->temp_location), - GST_ERROR_SYSTEM); + (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM); + g_free (name); + if (fd != -1) + close (fd); return FALSE; } } @@ -989,6 +1091,8 @@ if (queue->temp_file == NULL) return; + GST_DEBUG_OBJECT (queue, "closing temp file"); + /* we don't remove the file so that the application can use it as a cache * later on */ fflush (queue->temp_file); @@ -998,11 +1102,25 @@ } static void +gst_queue_flush_temp_file (GstQueue * queue) +{ + if (queue->temp_file == NULL) + return; + + GST_DEBUG_OBJECT (queue, "flushing temp file"); + + queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file); + + queue->writing_pos = 0; + queue->reading_pos = 0; + queue->max_reading_pos = 0; +} + +static void gst_queue_locked_flush (GstQueue * queue) { if (QUEUE_IS_USING_TEMP_FILE (queue)) { - gst_queue_close_temp_location_file (queue); - gst_queue_open_temp_location_file (queue); + gst_queue_flush_temp_file (queue); } else { while (!g_queue_is_empty (queue->queue)) { GstMiniObject *data = g_queue_pop_head (queue->queue); @@ -1039,6 +1157,7 @@ queue->cur_level.buffers++; queue->cur_level.bytes += size; queue->bytes_in += size; + /* apply new buffer to segment stats */ apply_buffer (queue, buffer, &queue->sink_segment); /* update the byterate stats */ @@ -1068,7 +1187,10 @@ goto unexpected_event; queue->segment_event_received = TRUE; + if (queue->starting_segment != NULL) + gst_event_unref (queue->starting_segment); queue->starting_segment = event; + item = NULL; } /* a new segment allows us to accept more buffers if we got UNEXPECTED * from downstream */ @@ -1092,6 +1214,9 @@ if (!QUEUE_IS_USING_TEMP_FILE (queue)) g_queue_push_tail (queue->queue, item); + else + gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); + GST_QUEUE_SIGNAL_ADD (queue); } @@ -1266,7 +1391,7 @@ return FALSE; if (QUEUE_IS_USING_TEMP_FILE (queue)) { - return queue->writing_pos == queue->reading_pos; + return queue->writing_pos == queue->max_reading_pos; } else { if (queue->queue->length == 0) return TRUE; @@ -1288,6 +1413,10 @@ if (QUEUE_IS_USING_TEMP_FILE (queue)) return FALSE; + /* we are never filled when we have no buffers at all */ + if (queue->cur_level.buffers == 0) + return FALSE; + #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ (queue->cur_level.format) >= (queue->max_level.format)) @@ -1548,8 +1677,14 @@ event, GST_EVENT_TYPE_NAME (event)); #endif - /* just forward upstream */ - res = gst_pad_push_event (queue->sinkpad, event); + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + /* just forward upstream */ + res = gst_pad_push_event (queue->sinkpad, event); + } else { + /* when using a temp file, we unblock the pending read */ + res = TRUE; + gst_event_unref (event); + } return res; } @@ -1605,15 +1740,7 @@ } case GST_QUERY_DURATION: { - GST_DEBUG_OBJECT (queue, "waiting for preroll in duration query"); - - GST_QUEUE_MUTEX_LOCK (queue); - /* we have to wait until the upstream element is at least paused, which - * happened when we received a first item. */ - while (gst_queue_is_empty (queue)) { - GST_QUEUE_WAIT_ADD_CHECK (queue, flushing); - } - GST_QUEUE_MUTEX_UNLOCK (queue); + GST_DEBUG_OBJECT (queue, "doing peer query"); if (!gst_queue_peer_query (queue, queue->sinkpad, query)) goto peer_failed; @@ -1621,6 +1748,59 @@ GST_DEBUG_OBJECT (queue, "peer query success"); break; } + case GST_QUERY_BUFFERING: + { + GstFormat format; + + GST_DEBUG_OBJECT (queue, "query buffering"); + + if (!QUEUE_IS_USING_TEMP_FILE (queue)) { + /* no temp file, just forward to the peer */ + if (!gst_queue_peer_query (queue, queue->sinkpad, query)) + goto peer_failed; + GST_DEBUG_OBJECT (queue, "buffering forwarded to peer"); + } else { + gint64 start, stop; + + gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL); + + switch (format) { + case GST_FORMAT_PERCENT: + { + gint64 duration; + GstFormat peer_fmt; + + peer_fmt = GST_FORMAT_BYTES; + + if (!gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt, + &duration)) + goto peer_failed; + + GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %" + G_GINT64_FORMAT, duration, queue->writing_pos); + + start = 0; + /* get our available data relative to the duration */ + if (duration != -1) + stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration; + else + stop = -1; + break; + } + case GST_FORMAT_BYTES: + start = 0; + stop = queue->writing_pos; + break; + default: + start = -1; + stop = -1; + break; + } + gst_query_set_buffering_percent (query, queue->is_buffering, 100); + gst_query_set_buffering_range (query, format, start, stop, -1); + } + break; + } default: /* peer handled other queries */ if (!gst_queue_peer_query (queue, queue->sinkpad, query)) @@ -1636,12 +1816,6 @@ GST_DEBUG_OBJECT (queue, "failed peer query"); return FALSE; } -flushing: - { - GST_DEBUG_OBJECT (queue, "flushing while waiting for query"); - GST_QUEUE_MUTEX_UNLOCK (queue); - return FALSE; - } } static GstFlowReturn @@ -1681,8 +1855,10 @@ gboolean ret; queue = GST_QUEUE (gst_pad_get_parent (pad)); + /* we can operate in pull mode when we are using a tempfile */ ret = QUEUE_IS_USING_TEMP_FILE (queue); + gst_object_unref (GST_OBJECT (queue)); return ret; @@ -1856,8 +2032,8 @@ #define QUEUE_THRESHOLD_CHANGE(q)\ g_cond_signal (queue->item_add); -static gboolean -gst_queue_set_temp_location (GstQueue * queue, const gchar * location) +static void +gst_queue_set_temp_template (GstQueue * queue, const gchar * template) { GstState state; @@ -1869,19 +2045,16 @@ GST_OBJECT_UNLOCK (queue); /* set new location */ - g_free (queue->temp_location); - queue->temp_location = g_strdup (location); + g_free (queue->temp_template); + queue->temp_template = g_strdup (template); - g_object_notify (G_OBJECT (queue), "temp-location"); - - return TRUE; + return; /* ERROR */ wrong_state: { - GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state"); + GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state"); GST_OBJECT_UNLOCK (queue); - return FALSE; } } @@ -1923,8 +2096,15 @@ case PROP_HIGH_PERCENT: queue->high_percent = g_value_get_int (value); break; + case PROP_TEMP_TEMPLATE: + gst_queue_set_temp_template (queue, g_value_get_string (value)); + break; case PROP_TEMP_LOCATION: - gst_queue_set_temp_location (queue, g_value_dup_string (value)); + g_free (queue->temp_location); + queue->temp_location = g_value_dup_string (value); + /* you can set the property back to NULL to make it use the temp-tmpl + * property. */ + queue->temp_location_set = queue->temp_location != NULL; break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1973,6 +2153,9 @@ case PROP_HIGH_PERCENT: g_value_set_int (value, queue->high_percent); break; + case PROP_TEMP_TEMPLATE: + g_value_set_string (value, queue->temp_template); + break; case PROP_TEMP_LOCATION: g_value_set_string (value, queue->temp_location); break; @@ -1995,6 +2178,7 @@ GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE, LOCALEDIR); bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR); + bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8"); #endif /* ENABLE_NLS */ return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE);