branch | RCL_3 |
changeset 30 | 7e817e7e631c |
parent 29 | 567bb019e3e3 |
29:567bb019e3e3 | 30:7e817e7e631c |
---|---|
25 /** |
25 /** |
26 * SECTION:element-queue2 |
26 * SECTION:element-queue2 |
27 * @short_description: Asynchronous data queue. |
27 * @short_description: Asynchronous data queue. |
28 * |
28 * |
29 * Data is queued until one of the limits specified by the |
29 * Data is queued until one of the limits specified by the |
30 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or |
30 * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or |
31 * #GstQueue2:max-size-time properties has been reached. Any attempt to push |
31 * #GstQueue:max-size-time properties has been reached. Any attempt to push |
32 * more buffers into the queue will block the pushing thread until more space |
32 * more buffers into the queue will block the pushing thread until more space |
33 * becomes available. |
33 * becomes available. |
34 * |
34 * |
35 * The queue will create a new thread on the source pad to decouple the |
35 * The queue will create a new thread on the source pad to decouple the |
36 * processing on sink and source pad. |
36 * processing on sink and source pad. |
37 * |
37 * |
38 * You can query how many buffers are queued by reading the |
38 * You can query how many buffers are queued by reading the |
39 * #GstQueue2:current-level-buffers property. |
39 * #GstQueue:current-level-buffers property. |
40 * |
40 * |
41 * The default queue size limits are 100 buffers, 2MB of data, or |
41 * The default queue size limits are 100 buffers, 2MB of data, or |
42 * two seconds worth of data, whichever is reached first. |
42 * two seconds worth of data, whichever is reached first. |
43 * |
43 * |
44 * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element |
44 * If you set temp-location, the element will buffer data on the file |
45 * will allocate a random free filename and buffer data in the file. |
45 * specified by it. By using this, it will buffer the entire |
46 * By using this, it will buffer the entire stream data on the file independently |
46 * stream data on the file independently of the queue size limits, they |
47 * of the queue size limits, they will only be used for buffering statistics. |
47 * will only be used for buffering statistics. |
48 * |
48 * |
49 * Since 0.10.24, setting the temp-location property with a filename is deprecated |
|
50 * because it's impossible to securely open a temporary file in this way. The |
|
51 * property will still be used to notify the application of the allocated |
|
52 * filename, though. |
|
53 * |
|
54 * Last reviewed on 2009-07-10 (0.10.24) |
|
55 */ |
49 */ |
56 |
50 |
57 #ifdef HAVE_CONFIG_H |
51 #ifdef HAVE_CONFIG_H |
58 #include "config.h" |
52 #include "config.h" |
59 #endif |
53 #endif |
61 #include <glib/gstdio.h> |
55 #include <glib/gstdio.h> |
62 |
56 |
63 #include <gst/gst.h> |
57 #include <gst/gst.h> |
64 #include <gst/gst-i18n-plugin.h> |
58 #include <gst/gst-i18n-plugin.h> |
65 |
59 |
66 #ifdef G_OS_WIN32 |
60 #ifdef __SYMBIAN32__ |
67 #include <io.h> /* lseek, open, close, read */ |
61 #include <glib_global.h> |
68 #undef lseek |
|
69 #define lseek _lseeki64 |
|
70 #undef off_t |
|
71 #define off_t guint64 |
|
72 #else |
|
73 #include <unistd.h> |
|
74 #endif |
62 #endif |
75 |
|
76 static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", |
63 static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", |
77 "Generic", |
64 "Generic", |
78 "Simple data queue", |
65 "Simple data queue", |
79 "Erik Walthinsen <omega@cse.ogi.edu>, " |
66 "Erik Walthinsen <omega@cse.ogi.edu>, " |
80 "Wim Taymans <wim.taymans@gmail.com>"); |
67 "Wim Taymans <wim.taymans@gmail.com>"); |
107 #define DEFAULT_LOW_PERCENT 10 |
94 #define DEFAULT_LOW_PERCENT 10 |
108 #define DEFAULT_HIGH_PERCENT 99 |
95 #define DEFAULT_HIGH_PERCENT 99 |
109 |
96 |
110 /* other defines */ |
97 /* other defines */ |
111 #define DEFAULT_BUFFER_SIZE 4096 |
98 #define DEFAULT_BUFFER_SIZE 4096 |
112 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL) |
99 #define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL) |
113 |
100 |
114 enum |
101 enum |
115 { |
102 { |
116 PROP_0, |
103 PROP_0, |
117 PROP_CUR_LEVEL_BUFFERS, |
104 PROP_CUR_LEVEL_BUFFERS, |
122 PROP_MAX_SIZE_TIME, |
109 PROP_MAX_SIZE_TIME, |
123 PROP_USE_BUFFERING, |
110 PROP_USE_BUFFERING, |
124 PROP_USE_RATE_ESTIMATE, |
111 PROP_USE_RATE_ESTIMATE, |
125 PROP_LOW_PERCENT, |
112 PROP_LOW_PERCENT, |
126 PROP_HIGH_PERCENT, |
113 PROP_HIGH_PERCENT, |
127 PROP_TEMP_TEMPLATE, |
|
128 PROP_TEMP_LOCATION |
114 PROP_TEMP_LOCATION |
129 }; |
115 }; |
130 |
116 |
131 #define GST_TYPE_QUEUE \ |
117 #define GST_TYPE_QUEUE \ |
132 (gst_queue_get_type()) |
118 (gst_queue_get_type()) |
211 GCond *item_add; /* signals buffers now available for reading */ |
197 GCond *item_add; /* signals buffers now available for reading */ |
212 gboolean waiting_del; |
198 gboolean waiting_del; |
213 GCond *item_del; /* signals space now available for writing */ |
199 GCond *item_del; /* signals space now available for writing */ |
214 |
200 |
215 /* temp location stuff */ |
201 /* temp location stuff */ |
216 gchar *temp_template; |
|
217 gboolean temp_location_set; |
|
218 gchar *temp_location; |
202 gchar *temp_location; |
219 FILE *temp_file; |
203 FILE *temp_file; |
220 guint64 writing_pos; |
204 guint64 writing_pos; |
221 guint64 reading_pos; |
205 guint64 reading_pos; |
222 guint64 max_reading_pos; |
|
223 /* we need this to send the first new segment event of the stream |
206 /* we need this to send the first new segment event of the stream |
224 * because we can't save it on the file */ |
207 * because we can't save it on the file */ |
225 gboolean segment_event_received; |
208 gboolean segment_event_received; |
226 GstEvent *starting_segment; |
209 GstEvent *starting_segment; |
227 |
210 |
243 queue->cur_level.bytes, \ |
226 queue->cur_level.bytes, \ |
244 queue->max_level.bytes, \ |
227 queue->max_level.bytes, \ |
245 queue->cur_level.time, \ |
228 queue->cur_level.time, \ |
246 queue->max_level.time, \ |
229 queue->max_level.time, \ |
247 (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ |
230 (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ |
248 queue->writing_pos - queue->max_reading_pos : \ |
231 queue->writing_pos - queue->reading_pos : \ |
249 queue->queue->length)) |
232 queue->queue->length)) |
250 |
233 |
251 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ |
234 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ |
252 g_mutex_lock (q->qlock); \ |
235 g_mutex_lock (q->qlock); \ |
253 } G_STMT_END |
236 } G_STMT_END |
382 |
365 |
383 /* properties */ |
366 /* properties */ |
384 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, |
367 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, |
385 g_param_spec_uint ("current-level-bytes", "Current level (kB)", |
368 g_param_spec_uint ("current-level-bytes", "Current level (kB)", |
386 "Current amount of data in the queue (bytes)", |
369 "Current amount of data in the queue (bytes)", |
387 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); |
370 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
388 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS, |
371 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS, |
389 g_param_spec_uint ("current-level-buffers", "Current level (buffers)", |
372 g_param_spec_uint ("current-level-buffers", "Current level (buffers)", |
390 "Current number of buffers in the queue", |
373 "Current number of buffers in the queue", |
391 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); |
374 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
392 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME, |
375 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME, |
393 g_param_spec_uint64 ("current-level-time", "Current level (ns)", |
376 g_param_spec_uint64 ("current-level-time", "Current level (ns)", |
394 "Current amount of data in the queue (in ns)", |
377 "Current amount of data in the queue (in ns)", |
395 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); |
378 0, G_MAXUINT64, 0, G_PARAM_READABLE)); |
396 |
379 |
397 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, |
380 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, |
398 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
381 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
399 "Max. amount of data in the queue (bytes, 0=disable)", |
382 "Max. amount of data in the queue (bytes, 0=disable)", |
400 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, |
383 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); |
401 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
402 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS, |
384 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS, |
403 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
385 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
404 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT, |
386 "Max. number of buffers in the queue (0=disable)", |
405 DEFAULT_MAX_SIZE_BUFFERS, |
387 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); |
406 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
407 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, |
388 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, |
408 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
389 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
409 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64, |
390 "Max. amount of data in the queue (in ns, 0=disable)", |
410 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
391 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); |
411 |
392 |
412 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING, |
393 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING, |
413 g_param_spec_boolean ("use-buffering", "Use buffering", |
394 g_param_spec_boolean ("use-buffering", "Use buffering", |
414 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", |
395 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", |
415 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
396 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE)); |
416 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, |
397 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, |
417 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", |
398 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", |
418 "Estimate the bitrate of the stream to calculate time level", |
399 "Estimate the bitrate of the stream to calculate time level", |
419 DEFAULT_USE_RATE_ESTIMATE, |
400 DEFAULT_USE_RATE_ESTIMATE, G_PARAM_READWRITE)); |
420 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
421 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, |
401 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, |
422 g_param_spec_int ("low-percent", "Low percent", |
402 g_param_spec_int ("low-percent", "Low percent", |
423 "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT, |
403 "Low threshold for buffering to start", |
424 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
404 0, 100, DEFAULT_LOW_PERCENT, G_PARAM_READWRITE)); |
425 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, |
405 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, |
426 g_param_spec_int ("high-percent", "High percent", |
406 g_param_spec_int ("high-percent", "High percent", |
427 "High threshold for buffering to finish", 0, 100, |
407 "High threshold for buffering to finish", |
428 DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
408 0, 100, DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE)); |
429 |
|
430 g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE, |
|
431 g_param_spec_string ("temp-template", "Temporary File Template", |
|
432 "File template to store temporary files in, should contain directory " |
|
433 "and XXXXXX. (NULL == disabled)", |
|
434 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
435 |
409 |
436 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION, |
410 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION, |
437 g_param_spec_string ("temp-location", "Temporary File Location", |
411 g_param_spec_string ("temp-location", "Temporary File Location", |
438 "Location to store temporary files in (Deprecated: Only read this " |
412 "Location of a temporary file to store data in", |
439 "property, use temp-tmpl to configure the name template)", |
413 NULL, G_PARAM_READWRITE)); |
440 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
441 |
414 |
442 gst_element_class_add_pad_template (gstelement_class, |
415 gst_element_class_add_pad_template (gstelement_class, |
443 gst_static_pad_template_get (&srctemplate)); |
416 gst_static_pad_template_get (&srctemplate)); |
444 gst_element_class_add_pad_template (gstelement_class, |
417 gst_element_class_add_pad_template (gstelement_class, |
445 gst_static_pad_template_get (&sinktemplate)); |
418 gst_static_pad_template_get (&sinktemplate)); |
512 queue->waiting_del = FALSE; |
485 queue->waiting_del = FALSE; |
513 queue->item_del = g_cond_new (); |
486 queue->item_del = g_cond_new (); |
514 queue->queue = g_queue_new (); |
487 queue->queue = g_queue_new (); |
515 |
488 |
516 /* tempfile related */ |
489 /* tempfile related */ |
517 queue->temp_template = NULL; |
|
518 queue->temp_location = NULL; |
490 queue->temp_location = NULL; |
519 queue->temp_location_set = FALSE; |
491 queue->temp_file = NULL; |
520 |
492 |
521 GST_DEBUG_OBJECT (queue, |
493 GST_DEBUG_OBJECT (queue, |
522 "initialized queue's not_empty & not_full conditions"); |
494 "initialized queue's not_empty & not_full conditions"); |
523 } |
495 } |
524 |
496 |
542 g_cond_free (queue->item_del); |
514 g_cond_free (queue->item_del); |
543 g_timer_destroy (queue->in_timer); |
515 g_timer_destroy (queue->in_timer); |
544 g_timer_destroy (queue->out_timer); |
516 g_timer_destroy (queue->out_timer); |
545 |
517 |
546 /* temp_file path cleanup */ |
518 /* temp_file path cleanup */ |
547 g_free (queue->temp_template); |
519 if (queue->temp_location != NULL) |
548 g_free (queue->temp_location); |
520 g_free (queue->temp_location); |
549 |
521 |
550 G_OBJECT_CLASS (parent_class)->finalize (object); |
522 G_OBJECT_CLASS (parent_class)->finalize (object); |
551 } |
523 } |
552 |
524 |
553 static GstCaps * |
525 static GstCaps * |
715 queue->buffering_iteration++; |
687 queue->buffering_iteration++; |
716 post = TRUE; |
688 post = TRUE; |
717 } |
689 } |
718 } |
690 } |
719 if (post) { |
691 if (post) { |
720 GstMessage *message; |
|
721 GstBufferingMode mode; |
|
722 |
|
723 /* scale to high percent so that it becomes the 100% mark */ |
692 /* scale to high percent so that it becomes the 100% mark */ |
724 percent = percent * 100 / queue->high_percent; |
693 percent = percent * 100 / queue->high_percent; |
725 /* clip */ |
694 /* clip */ |
726 if (percent > 100) |
695 if (percent > 100) |
727 percent = 100; |
696 percent = 100; |
728 |
697 |
729 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
|
730 mode = GST_BUFFERING_DOWNLOAD; |
|
731 else |
|
732 mode = GST_BUFFERING_STREAM; |
|
733 |
|
734 GST_DEBUG_OBJECT (queue, "buffering %d percent", percent); |
698 GST_DEBUG_OBJECT (queue, "buffering %d percent", percent); |
735 message = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent); |
699 gst_element_post_message (GST_ELEMENT_CAST (queue), |
736 gst_message_set_buffering_stats (message, mode, |
700 gst_message_new_buffering (GST_OBJECT_CAST (queue), percent)); |
737 queue->byte_in_rate, queue->byte_out_rate, -1); |
|
738 |
|
739 gst_element_post_message (GST_ELEMENT_CAST (queue), message); |
|
740 |
|
741 } else { |
701 } else { |
742 GST_DEBUG_OBJECT (queue, "filled %d percent", percent); |
702 GST_DEBUG_OBJECT (queue, "filled %d percent", percent); |
743 } |
703 } |
744 |
704 |
745 #undef GET_PERCENT |
705 #undef GET_PERCENT |
838 |
798 |
839 /* reset the values to calculate rate over the next interval */ |
799 /* reset the values to calculate rate over the next interval */ |
840 queue->last_out_elapsed = elapsed; |
800 queue->last_out_elapsed = elapsed; |
841 queue->bytes_out = 0; |
801 queue->bytes_out = 0; |
842 } |
802 } |
843 if (queue->byte_in_rate > 0.0) { |
|
844 queue->cur_level.rate_time = |
|
845 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND; |
|
846 } |
|
847 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT, |
803 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT, |
848 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); |
804 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); |
849 } |
805 } |
850 |
806 |
851 static void |
807 static void |
853 { |
809 { |
854 guint size; |
810 guint size; |
855 guint8 *data; |
811 guint8 *data; |
856 int ret; |
812 int ret; |
857 |
813 |
858 #ifdef HAVE_FSEEKO |
|
859 fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET); |
|
860 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32) |
|
861 lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET); |
|
862 #else |
|
863 fseek (queue->temp_file, queue->writing_pos, SEEK_SET); |
814 fseek (queue->temp_file, queue->writing_pos, SEEK_SET); |
864 #endif |
|
865 |
815 |
866 data = GST_BUFFER_DATA (buffer); |
816 data = GST_BUFFER_DATA (buffer); |
867 size = GST_BUFFER_SIZE (buffer); |
817 size = GST_BUFFER_SIZE (buffer); |
868 |
818 |
869 ret = fwrite (data, 1, size, queue->temp_file); |
819 ret = fwrite (data, 1, size, queue->temp_file); |
870 if (ret < size) { |
820 if (ret < size) { |
871 /* FIXME do something useful here */ |
821 /* FIXME do something useful here */ |
872 GST_ERROR_OBJECT (queue, "fwrite returned error"); |
822 GST_ERROR_OBJECT (queue, "fwrite returned error"); |
873 } |
823 } |
874 queue->writing_pos += size; |
824 queue->writing_pos += size; |
875 |
|
876 if (queue->writing_pos > queue->max_reading_pos) |
|
877 queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; |
|
878 else |
|
879 queue->cur_level.bytes = 0; |
|
880 } |
825 } |
881 |
826 |
882 /* see if there is enough data in the file to read a full buffer */ |
827 /* see if there is enough data in the file to read a full buffer */ |
883 static gboolean |
828 static gboolean |
884 gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) |
829 gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) |
909 } |
854 } |
910 |
855 |
911 #ifdef HAVE_FSEEKO |
856 #ifdef HAVE_FSEEKO |
912 if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0) |
857 if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0) |
913 goto seek_failed; |
858 goto seek_failed; |
914 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32) |
859 #elif defined (G_OS_UNIX) |
915 if (lseek (fileno (queue->temp_file), (off_t) offset, |
860 if (lseek (fileno (queue->temp_file), (off_t) offset, |
916 SEEK_SET) == (off_t) - 1) |
861 SEEK_SET) == (off_t) - 1) |
917 goto seek_failed; |
862 goto seek_failed; |
918 #else |
863 #else |
919 if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0) |
864 if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0) |
942 GST_BUFFER_OFFSET_END (buf) = offset + length; |
887 GST_BUFFER_OFFSET_END (buf) = offset + length; |
943 |
888 |
944 *buffer = buf; |
889 *buffer = buf; |
945 |
890 |
946 queue->reading_pos = offset + length; |
891 queue->reading_pos = offset + length; |
947 queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos); |
|
948 |
|
949 if (queue->writing_pos > queue->max_reading_pos) |
|
950 queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; |
|
951 else |
|
952 queue->cur_level.bytes = 0; |
|
953 |
892 |
954 return GST_FLOW_OK; |
893 return GST_FLOW_OK; |
955 |
894 |
956 /* ERRORS */ |
895 /* ERRORS */ |
957 out_flushing: |
896 out_flushing: |
1010 } |
949 } |
1011 |
950 |
1012 static gboolean |
951 static gboolean |
1013 gst_queue_open_temp_location_file (GstQueue * queue) |
952 gst_queue_open_temp_location_file (GstQueue * queue) |
1014 { |
953 { |
1015 gint fd = -1; |
954 /* nothing to do */ |
1016 gchar *name = NULL; |
955 if (queue->temp_location == NULL) |
1017 |
956 goto no_filename; |
1018 GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template); |
957 |
1019 |
958 /* open the file for update/writing */ |
1020 /* we have two cases: |
959 queue->temp_file = g_fopen (queue->temp_location, "wb+"); |
1021 * - temp_location was set to something !NULL (Deprecated). in this case we |
960 /* error creating file */ |
1022 * open the specified filename. |
961 if (queue->temp_file == NULL) |
1023 * - temp_template was set, allocate a filename and open that filename |
962 goto open_failed; |
1024 */ |
|
1025 if (!queue->temp_location_set) { |
|
1026 /* nothing to do */ |
|
1027 if (queue->temp_template == NULL) |
|
1028 goto no_directory; |
|
1029 |
|
1030 /* make copy of the template, we don't want to change this */ |
|
1031 name = g_strdup (queue->temp_template); |
|
1032 fd = g_mkstemp (name); |
|
1033 if (fd == -1) |
|
1034 goto mkstemp_failed; |
|
1035 |
|
1036 /* open the file for update/writing */ |
|
1037 queue->temp_file = fdopen (fd, "wb+"); |
|
1038 /* error creating file */ |
|
1039 if (queue->temp_file == NULL) |
|
1040 goto open_failed; |
|
1041 |
|
1042 g_free (queue->temp_location); |
|
1043 queue->temp_location = name; |
|
1044 |
|
1045 g_object_notify (G_OBJECT (queue), "temp-location"); |
|
1046 } else { |
|
1047 /* open the file for update/writing, this is deprecated but we still need to |
|
1048 * support it for API/ABI compatibility */ |
|
1049 queue->temp_file = g_fopen (queue->temp_location, "wb+"); |
|
1050 /* error creating file */ |
|
1051 if (queue->temp_file == NULL) |
|
1052 goto open_failed; |
|
1053 } |
|
1054 |
963 |
1055 queue->writing_pos = 0; |
964 queue->writing_pos = 0; |
1056 queue->reading_pos = 0; |
965 queue->reading_pos = 0; |
1057 queue->max_reading_pos = 0; |
|
1058 |
966 |
1059 return TRUE; |
967 return TRUE; |
1060 |
968 |
1061 /* ERRORS */ |
969 /* ERRORS */ |
1062 no_directory: |
970 no_filename: |
1063 { |
971 { |
1064 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, |
972 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, |
1065 (_("No Temp directory specified.")), (NULL)); |
973 (_("No file name specified.")), (NULL)); |
1066 return FALSE; |
|
1067 } |
|
1068 mkstemp_failed: |
|
1069 { |
|
1070 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, |
|
1071 (_("Could not create temp file \"%s\"."), queue->temp_template), |
|
1072 GST_ERROR_SYSTEM); |
|
1073 g_free (name); |
|
1074 return FALSE; |
974 return FALSE; |
1075 } |
975 } |
1076 open_failed: |
976 open_failed: |
1077 { |
977 { |
1078 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, |
978 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, |
1079 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM); |
979 (_("Could not open file \"%s\" for reading."), queue->temp_location), |
1080 g_free (name); |
980 GST_ERROR_SYSTEM); |
1081 if (fd != -1) |
|
1082 close (fd); |
|
1083 return FALSE; |
981 return FALSE; |
1084 } |
982 } |
1085 } |
983 } |
1086 |
984 |
1087 static void |
985 static void |
1088 gst_queue_close_temp_location_file (GstQueue * queue) |
986 gst_queue_close_temp_location_file (GstQueue * queue) |
1089 { |
987 { |
1090 /* nothing to do */ |
988 /* nothing to do */ |
1091 if (queue->temp_file == NULL) |
989 if (queue->temp_file == NULL) |
1092 return; |
990 return; |
1093 |
|
1094 GST_DEBUG_OBJECT (queue, "closing temp file"); |
|
1095 |
991 |
1096 /* we don't remove the file so that the application can use it as a cache |
992 /* we don't remove the file so that the application can use it as a cache |
1097 * later on */ |
993 * later on */ |
1098 fflush (queue->temp_file); |
994 fflush (queue->temp_file); |
1099 fclose (queue->temp_file); |
995 fclose (queue->temp_file); |
1100 remove (queue->temp_location); |
996 remove (queue->temp_location); |
1101 queue->temp_file = NULL; |
997 queue->temp_file = NULL; |
1102 } |
998 } |
1103 |
999 |
1104 static void |
1000 static void |
1105 gst_queue_flush_temp_file (GstQueue * queue) |
|
1106 { |
|
1107 if (queue->temp_file == NULL) |
|
1108 return; |
|
1109 |
|
1110 GST_DEBUG_OBJECT (queue, "flushing temp file"); |
|
1111 |
|
1112 queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file); |
|
1113 |
|
1114 queue->writing_pos = 0; |
|
1115 queue->reading_pos = 0; |
|
1116 queue->max_reading_pos = 0; |
|
1117 } |
|
1118 |
|
1119 static void |
|
1120 gst_queue_locked_flush (GstQueue * queue) |
1001 gst_queue_locked_flush (GstQueue * queue) |
1121 { |
1002 { |
1122 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1003 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1123 gst_queue_flush_temp_file (queue); |
1004 gst_queue_close_temp_location_file (queue); |
1005 gst_queue_open_temp_location_file (queue); |
|
1124 } else { |
1006 } else { |
1125 while (!g_queue_is_empty (queue->queue)) { |
1007 while (!g_queue_is_empty (queue->queue)) { |
1126 GstMiniObject *data = g_queue_pop_head (queue->queue); |
1008 GstMiniObject *data = g_queue_pop_head (queue->queue); |
1127 |
1009 |
1128 /* Then lose another reference because we are supposed to destroy that |
1010 /* Then lose another reference because we are supposed to destroy that |
1155 |
1037 |
1156 /* add buffer to the statistics */ |
1038 /* add buffer to the statistics */ |
1157 queue->cur_level.buffers++; |
1039 queue->cur_level.buffers++; |
1158 queue->cur_level.bytes += size; |
1040 queue->cur_level.bytes += size; |
1159 queue->bytes_in += size; |
1041 queue->bytes_in += size; |
1160 |
|
1161 /* apply new buffer to segment stats */ |
1042 /* apply new buffer to segment stats */ |
1162 apply_buffer (queue, buffer, &queue->sink_segment); |
1043 apply_buffer (queue, buffer, &queue->sink_segment); |
1163 /* update the byterate stats */ |
1044 /* update the byterate stats */ |
1164 update_in_rates (queue); |
1045 update_in_rates (queue); |
1165 |
1046 |
1185 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1066 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1186 if (queue->segment_event_received) |
1067 if (queue->segment_event_received) |
1187 goto unexpected_event; |
1068 goto unexpected_event; |
1188 |
1069 |
1189 queue->segment_event_received = TRUE; |
1070 queue->segment_event_received = TRUE; |
1190 if (queue->starting_segment != NULL) |
|
1191 gst_event_unref (queue->starting_segment); |
|
1192 queue->starting_segment = event; |
1071 queue->starting_segment = event; |
1193 item = NULL; |
|
1194 } |
1072 } |
1195 /* a new segment allows us to accept more buffers if we got UNEXPECTED |
1073 /* a new segment allows us to accept more buffers if we got UNEXPECTED |
1196 * from downstream */ |
1074 * from downstream */ |
1197 queue->unexpected = FALSE; |
1075 queue->unexpected = FALSE; |
1198 break; |
1076 break; |
1212 /* update the buffering status */ |
1090 /* update the buffering status */ |
1213 update_buffering (queue); |
1091 update_buffering (queue); |
1214 |
1092 |
1215 if (!QUEUE_IS_USING_TEMP_FILE (queue)) |
1093 if (!QUEUE_IS_USING_TEMP_FILE (queue)) |
1216 g_queue_push_tail (queue->queue, item); |
1094 g_queue_push_tail (queue->queue, item); |
1217 else |
|
1218 gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); |
|
1219 |
|
1220 GST_QUEUE_SIGNAL_ADD (queue); |
1095 GST_QUEUE_SIGNAL_ADD (queue); |
1221 } |
1096 } |
1222 |
1097 |
1223 return; |
1098 return; |
1224 |
1099 |
1389 /* never empty on EOS */ |
1264 /* never empty on EOS */ |
1390 if (queue->is_eos) |
1265 if (queue->is_eos) |
1391 return FALSE; |
1266 return FALSE; |
1392 |
1267 |
1393 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1268 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1394 return queue->writing_pos == queue->max_reading_pos; |
1269 return queue->writing_pos == queue->reading_pos; |
1395 } else { |
1270 } else { |
1396 if (queue->queue->length == 0) |
1271 if (queue->queue->length == 0) |
1397 return TRUE; |
1272 return TRUE; |
1398 } |
1273 } |
1399 |
1274 |
1409 if (queue->is_eos) |
1284 if (queue->is_eos) |
1410 return TRUE; |
1285 return TRUE; |
1411 |
1286 |
1412 /* if using file, we're never filled if we don't have EOS */ |
1287 /* if using file, we're never filled if we don't have EOS */ |
1413 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
1288 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
1414 return FALSE; |
|
1415 |
|
1416 /* we are never filled when we have no buffers at all */ |
|
1417 if (queue->cur_level.buffers == 0) |
|
1418 return FALSE; |
1289 return FALSE; |
1419 |
1290 |
1420 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ |
1291 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ |
1421 (queue->cur_level.format) >= (queue->max_level.format)) |
1292 (queue->cur_level.format) >= (queue->max_level.format)) |
1422 |
1293 |
1675 #ifndef GST_DISABLE_GST_DEBUG |
1546 #ifndef GST_DISABLE_GST_DEBUG |
1676 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)", |
1547 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)", |
1677 event, GST_EVENT_TYPE_NAME (event)); |
1548 event, GST_EVENT_TYPE_NAME (event)); |
1678 #endif |
1549 #endif |
1679 |
1550 |
1680 if (!QUEUE_IS_USING_TEMP_FILE (queue)) { |
1551 /* just forward upstream */ |
1681 /* just forward upstream */ |
1552 res = gst_pad_push_event (queue->sinkpad, event); |
1682 res = gst_pad_push_event (queue->sinkpad, event); |
|
1683 } else { |
|
1684 /* when using a temp file, we unblock the pending read */ |
|
1685 res = TRUE; |
|
1686 gst_event_unref (event); |
|
1687 } |
|
1688 |
1553 |
1689 return res; |
1554 return res; |
1690 } |
1555 } |
1691 |
1556 |
1692 static gboolean |
1557 static gboolean |
1738 gst_query_set_position (query, format, peer_pos); |
1603 gst_query_set_position (query, format, peer_pos); |
1739 break; |
1604 break; |
1740 } |
1605 } |
1741 case GST_QUERY_DURATION: |
1606 case GST_QUERY_DURATION: |
1742 { |
1607 { |
1743 GST_DEBUG_OBJECT (queue, "doing peer query"); |
1608 GST_DEBUG_OBJECT (queue, "waiting for preroll in duration query"); |
1609 |
|
1610 GST_QUEUE_MUTEX_LOCK (queue); |
|
1611 /* we have to wait until the upstream element is at least paused, which |
|
1612 * happened when we received a first item. */ |
|
1613 while (gst_queue_is_empty (queue)) { |
|
1614 GST_QUEUE_WAIT_ADD_CHECK (queue, flushing); |
|
1615 } |
|
1616 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1744 |
1617 |
1745 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1618 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1746 goto peer_failed; |
1619 goto peer_failed; |
1747 |
1620 |
1748 GST_DEBUG_OBJECT (queue, "peer query success"); |
1621 GST_DEBUG_OBJECT (queue, "peer query success"); |
1749 break; |
|
1750 } |
|
1751 case GST_QUERY_BUFFERING: |
|
1752 { |
|
1753 GstFormat format; |
|
1754 |
|
1755 GST_DEBUG_OBJECT (queue, "query buffering"); |
|
1756 |
|
1757 if (!QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1758 /* no temp file, just forward to the peer */ |
|
1759 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
|
1760 goto peer_failed; |
|
1761 GST_DEBUG_OBJECT (queue, "buffering forwarded to peer"); |
|
1762 } else { |
|
1763 gint64 start, stop; |
|
1764 |
|
1765 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL); |
|
1766 |
|
1767 switch (format) { |
|
1768 case GST_FORMAT_PERCENT: |
|
1769 { |
|
1770 gint64 duration; |
|
1771 GstFormat peer_fmt; |
|
1772 |
|
1773 peer_fmt = GST_FORMAT_BYTES; |
|
1774 |
|
1775 if (!gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt, |
|
1776 &duration)) |
|
1777 goto peer_failed; |
|
1778 |
|
1779 GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %" |
|
1780 G_GINT64_FORMAT, duration, queue->writing_pos); |
|
1781 |
|
1782 start = 0; |
|
1783 /* get our available data relative to the duration */ |
|
1784 if (duration != -1) |
|
1785 stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration; |
|
1786 else |
|
1787 stop = -1; |
|
1788 break; |
|
1789 } |
|
1790 case GST_FORMAT_BYTES: |
|
1791 start = 0; |
|
1792 stop = queue->writing_pos; |
|
1793 break; |
|
1794 default: |
|
1795 start = -1; |
|
1796 stop = -1; |
|
1797 break; |
|
1798 } |
|
1799 gst_query_set_buffering_percent (query, queue->is_buffering, 100); |
|
1800 gst_query_set_buffering_range (query, format, start, stop, -1); |
|
1801 } |
|
1802 break; |
1622 break; |
1803 } |
1623 } |
1804 default: |
1624 default: |
1805 /* peer handled other queries */ |
1625 /* peer handled other queries */ |
1806 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1626 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1812 |
1632 |
1813 /* ERRORS */ |
1633 /* ERRORS */ |
1814 peer_failed: |
1634 peer_failed: |
1815 { |
1635 { |
1816 GST_DEBUG_OBJECT (queue, "failed peer query"); |
1636 GST_DEBUG_OBJECT (queue, "failed peer query"); |
1637 return FALSE; |
|
1638 } |
|
1639 flushing: |
|
1640 { |
|
1641 GST_DEBUG_OBJECT (queue, "flushing while waiting for query"); |
|
1642 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1817 return FALSE; |
1643 return FALSE; |
1818 } |
1644 } |
1819 } |
1645 } |
1820 |
1646 |
1821 static GstFlowReturn |
1647 static GstFlowReturn |
1853 { |
1679 { |
1854 GstQueue *queue; |
1680 GstQueue *queue; |
1855 gboolean ret; |
1681 gboolean ret; |
1856 |
1682 |
1857 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
1683 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
1858 |
|
1859 /* we can operate in pull mode when we are using a tempfile */ |
1684 /* we can operate in pull mode when we are using a tempfile */ |
1860 ret = QUEUE_IS_USING_TEMP_FILE (queue); |
1685 ret = QUEUE_IS_USING_TEMP_FILE (queue); |
1861 |
|
1862 gst_object_unref (GST_OBJECT (queue)); |
1686 gst_object_unref (GST_OBJECT (queue)); |
1863 |
1687 |
1864 return ret; |
1688 return ret; |
1865 } |
1689 } |
1866 |
1690 |
2030 * be able to preceed. |
1854 * be able to preceed. |
2031 */ |
1855 */ |
2032 #define QUEUE_THRESHOLD_CHANGE(q)\ |
1856 #define QUEUE_THRESHOLD_CHANGE(q)\ |
2033 g_cond_signal (queue->item_add); |
1857 g_cond_signal (queue->item_add); |
2034 |
1858 |
2035 static void |
1859 static gboolean |
2036 gst_queue_set_temp_template (GstQueue * queue, const gchar * template) |
1860 gst_queue_set_temp_location (GstQueue * queue, const gchar * location) |
2037 { |
1861 { |
2038 GstState state; |
1862 GstState state; |
2039 |
1863 |
2040 /* the element must be stopped in order to do this */ |
1864 /* the element must be stopped in order to do this */ |
2041 GST_OBJECT_LOCK (queue); |
1865 GST_OBJECT_LOCK (queue); |
2043 if (state != GST_STATE_READY && state != GST_STATE_NULL) |
1867 if (state != GST_STATE_READY && state != GST_STATE_NULL) |
2044 goto wrong_state; |
1868 goto wrong_state; |
2045 GST_OBJECT_UNLOCK (queue); |
1869 GST_OBJECT_UNLOCK (queue); |
2046 |
1870 |
2047 /* set new location */ |
1871 /* set new location */ |
2048 g_free (queue->temp_template); |
1872 g_free (queue->temp_location); |
2049 queue->temp_template = g_strdup (template); |
1873 queue->temp_location = g_strdup (location); |
2050 |
1874 |
2051 return; |
1875 g_object_notify (G_OBJECT (queue), "temp-location"); |
1876 |
|
1877 return TRUE; |
|
2052 |
1878 |
2053 /* ERROR */ |
1879 /* ERROR */ |
2054 wrong_state: |
1880 wrong_state: |
2055 { |
1881 { |
2056 GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state"); |
1882 GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state"); |
2057 GST_OBJECT_UNLOCK (queue); |
1883 GST_OBJECT_UNLOCK (queue); |
1884 return FALSE; |
|
2058 } |
1885 } |
2059 } |
1886 } |
2060 |
1887 |
2061 static void |
1888 static void |
2062 gst_queue_set_property (GObject * object, |
1889 gst_queue_set_property (GObject * object, |
2094 queue->low_percent = g_value_get_int (value); |
1921 queue->low_percent = g_value_get_int (value); |
2095 break; |
1922 break; |
2096 case PROP_HIGH_PERCENT: |
1923 case PROP_HIGH_PERCENT: |
2097 queue->high_percent = g_value_get_int (value); |
1924 queue->high_percent = g_value_get_int (value); |
2098 break; |
1925 break; |
2099 case PROP_TEMP_TEMPLATE: |
|
2100 gst_queue_set_temp_template (queue, g_value_get_string (value)); |
|
2101 break; |
|
2102 case PROP_TEMP_LOCATION: |
1926 case PROP_TEMP_LOCATION: |
2103 g_free (queue->temp_location); |
1927 gst_queue_set_temp_location (queue, g_value_dup_string (value)); |
2104 queue->temp_location = g_value_dup_string (value); |
|
2105 /* you can set the property back to NULL to make it use the temp-tmpl |
|
2106 * property. */ |
|
2107 queue->temp_location_set = queue->temp_location != NULL; |
|
2108 break; |
1928 break; |
2109 default: |
1929 default: |
2110 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
1930 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
2111 break; |
1931 break; |
2112 } |
1932 } |
2150 case PROP_LOW_PERCENT: |
1970 case PROP_LOW_PERCENT: |
2151 g_value_set_int (value, queue->low_percent); |
1971 g_value_set_int (value, queue->low_percent); |
2152 break; |
1972 break; |
2153 case PROP_HIGH_PERCENT: |
1973 case PROP_HIGH_PERCENT: |
2154 g_value_set_int (value, queue->high_percent); |
1974 g_value_set_int (value, queue->high_percent); |
2155 break; |
|
2156 case PROP_TEMP_TEMPLATE: |
|
2157 g_value_set_string (value, queue->temp_template); |
|
2158 break; |
1975 break; |
2159 case PROP_TEMP_LOCATION: |
1976 case PROP_TEMP_LOCATION: |
2160 g_value_set_string (value, queue->temp_location); |
1977 g_value_set_string (value, queue->temp_location); |
2161 break; |
1978 break; |
2162 default: |
1979 default: |
2176 |
1993 |
2177 #ifdef ENABLE_NLS |
1994 #ifdef ENABLE_NLS |
2178 GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE, |
1995 GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE, |
2179 LOCALEDIR); |
1996 LOCALEDIR); |
2180 bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR); |
1997 bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR); |
2181 bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8"); |
|
2182 #endif /* ENABLE_NLS */ |
1998 #endif /* ENABLE_NLS */ |
2183 |
1999 |
2184 return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE); |
2000 return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE); |
2185 } |
2001 } |
2186 |
2002 |