changeset 8 | 4a7fac7dd34a |
parent 0 | 0e761a78d257 |
child 30 | 7e817e7e631c |
7:71e347f905f2 | 8:4a7fac7dd34a |
---|---|
25 /** |
25 /** |
26 * SECTION:element-queue2 |
26 * SECTION:element-queue2 |
27 * @short_description: Asynchronous data queue. |
27 * @short_description: Asynchronous data queue. |
28 * |
28 * |
29 * Data is queued until one of the limits specified by the |
29 * Data is queued until one of the limits specified by the |
30 * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or |
30 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or |
31 * #GstQueue:max-size-time properties has been reached. Any attempt to push |
31 * #GstQueue2:max-size-time properties has been reached. Any attempt to push |
32 * more buffers into the queue will block the pushing thread until more space |
32 * more buffers into the queue will block the pushing thread until more space |
33 * becomes available. |
33 * becomes available. |
34 * |
34 * |
35 * The queue will create a new thread on the source pad to decouple the |
35 * The queue will create a new thread on the source pad to decouple the |
36 * processing on sink and source pad. |
36 * processing on sink and source pad. |
37 * |
37 * |
38 * You can query how many buffers are queued by reading the |
38 * You can query how many buffers are queued by reading the |
39 * #GstQueue:current-level-buffers property. |
39 * #GstQueue2:current-level-buffers property. |
40 * |
40 * |
41 * The default queue size limits are 100 buffers, 2MB of data, or |
41 * The default queue size limits are 100 buffers, 2MB of data, or |
42 * two seconds worth of data, whichever is reached first. |
42 * two seconds worth of data, whichever is reached first. |
43 * |
43 * |
44 * If you set temp-location, the element will buffer data on the file |
44 * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element |
45 * specified by it. By using this, it will buffer the entire |
45 * will allocate a random free filename and buffer data in the file. |
46 * stream data on the file independently of the queue size limits, they |
46 * By using this, it will buffer the entire stream data on the file independently |
47 * will only be used for buffering statistics. |
47 * of the queue size limits, they will only be used for buffering statistics. |
48 * |
48 * |
49 * Since 0.10.24, setting the temp-location property with a filename is deprecated |
|
50 * because it's impossible to securely open a temporary file in this way. The |
|
51 * property will still be used to notify the application of the allocated |
|
52 * filename, though. |
|
53 * |
|
54 * Last reviewed on 2009-07-10 (0.10.24) |
|
49 */ |
55 */ |
50 |
56 |
51 #ifdef HAVE_CONFIG_H |
57 #ifdef HAVE_CONFIG_H |
52 #include "config.h" |
58 #include "config.h" |
53 #endif |
59 #endif |
55 #include <glib/gstdio.h> |
61 #include <glib/gstdio.h> |
56 |
62 |
57 #include <gst/gst.h> |
63 #include <gst/gst.h> |
58 #include <gst/gst-i18n-plugin.h> |
64 #include <gst/gst-i18n-plugin.h> |
59 |
65 |
60 #ifdef __SYMBIAN32__ |
66 #ifdef G_OS_WIN32 |
61 #include <glib_global.h> |
67 #include <io.h> /* lseek, open, close, read */ |
68 #undef lseek |
|
69 #define lseek _lseeki64 |
|
70 #undef off_t |
|
71 #define off_t guint64 |
|
72 #else |
|
73 #include <unistd.h> |
|
62 #endif |
74 #endif |
75 |
|
63 static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", |
76 static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", |
64 "Generic", |
77 "Generic", |
65 "Simple data queue", |
78 "Simple data queue", |
66 "Erik Walthinsen <omega@cse.ogi.edu>, " |
79 "Erik Walthinsen <omega@cse.ogi.edu>, " |
67 "Wim Taymans <wim.taymans@gmail.com>"); |
80 "Wim Taymans <wim.taymans@gmail.com>"); |
94 #define DEFAULT_LOW_PERCENT 10 |
107 #define DEFAULT_LOW_PERCENT 10 |
95 #define DEFAULT_HIGH_PERCENT 99 |
108 #define DEFAULT_HIGH_PERCENT 99 |
96 |
109 |
97 /* other defines */ |
110 /* other defines */ |
98 #define DEFAULT_BUFFER_SIZE 4096 |
111 #define DEFAULT_BUFFER_SIZE 4096 |
99 #define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL) |
112 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL) |
100 |
113 |
101 enum |
114 enum |
102 { |
115 { |
103 PROP_0, |
116 PROP_0, |
104 PROP_CUR_LEVEL_BUFFERS, |
117 PROP_CUR_LEVEL_BUFFERS, |
109 PROP_MAX_SIZE_TIME, |
122 PROP_MAX_SIZE_TIME, |
110 PROP_USE_BUFFERING, |
123 PROP_USE_BUFFERING, |
111 PROP_USE_RATE_ESTIMATE, |
124 PROP_USE_RATE_ESTIMATE, |
112 PROP_LOW_PERCENT, |
125 PROP_LOW_PERCENT, |
113 PROP_HIGH_PERCENT, |
126 PROP_HIGH_PERCENT, |
127 PROP_TEMP_TEMPLATE, |
|
114 PROP_TEMP_LOCATION |
128 PROP_TEMP_LOCATION |
115 }; |
129 }; |
116 |
130 |
117 #define GST_TYPE_QUEUE \ |
131 #define GST_TYPE_QUEUE \ |
118 (gst_queue_get_type()) |
132 (gst_queue_get_type()) |
197 GCond *item_add; /* signals buffers now available for reading */ |
211 GCond *item_add; /* signals buffers now available for reading */ |
198 gboolean waiting_del; |
212 gboolean waiting_del; |
199 GCond *item_del; /* signals space now available for writing */ |
213 GCond *item_del; /* signals space now available for writing */ |
200 |
214 |
201 /* temp location stuff */ |
215 /* temp location stuff */ |
216 gchar *temp_template; |
|
217 gboolean temp_location_set; |
|
202 gchar *temp_location; |
218 gchar *temp_location; |
203 FILE *temp_file; |
219 FILE *temp_file; |
204 guint64 writing_pos; |
220 guint64 writing_pos; |
205 guint64 reading_pos; |
221 guint64 reading_pos; |
222 guint64 max_reading_pos; |
|
206 /* we need this to send the first new segment event of the stream |
223 /* we need this to send the first new segment event of the stream |
207 * because we can't save it on the file */ |
224 * because we can't save it on the file */ |
208 gboolean segment_event_received; |
225 gboolean segment_event_received; |
209 GstEvent *starting_segment; |
226 GstEvent *starting_segment; |
210 |
227 |
226 queue->cur_level.bytes, \ |
243 queue->cur_level.bytes, \ |
227 queue->max_level.bytes, \ |
244 queue->max_level.bytes, \ |
228 queue->cur_level.time, \ |
245 queue->cur_level.time, \ |
229 queue->max_level.time, \ |
246 queue->max_level.time, \ |
230 (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ |
247 (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ |
231 queue->writing_pos - queue->reading_pos : \ |
248 queue->writing_pos - queue->max_reading_pos : \ |
232 queue->queue->length)) |
249 queue->queue->length)) |
233 |
250 |
234 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ |
251 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ |
235 g_mutex_lock (q->qlock); \ |
252 g_mutex_lock (q->qlock); \ |
236 } G_STMT_END |
253 } G_STMT_END |
365 |
382 |
366 /* properties */ |
383 /* properties */ |
367 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, |
384 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, |
368 g_param_spec_uint ("current-level-bytes", "Current level (kB)", |
385 g_param_spec_uint ("current-level-bytes", "Current level (kB)", |
369 "Current amount of data in the queue (bytes)", |
386 "Current amount of data in the queue (bytes)", |
370 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
387 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); |
371 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS, |
388 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS, |
372 g_param_spec_uint ("current-level-buffers", "Current level (buffers)", |
389 g_param_spec_uint ("current-level-buffers", "Current level (buffers)", |
373 "Current number of buffers in the queue", |
390 "Current number of buffers in the queue", |
374 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
391 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); |
375 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME, |
392 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME, |
376 g_param_spec_uint64 ("current-level-time", "Current level (ns)", |
393 g_param_spec_uint64 ("current-level-time", "Current level (ns)", |
377 "Current amount of data in the queue (in ns)", |
394 "Current amount of data in the queue (in ns)", |
378 0, G_MAXUINT64, 0, G_PARAM_READABLE)); |
395 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); |
379 |
396 |
380 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, |
397 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, |
381 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
398 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
382 "Max. amount of data in the queue (bytes, 0=disable)", |
399 "Max. amount of data in the queue (bytes, 0=disable)", |
383 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); |
400 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, |
401 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
384 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS, |
402 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS, |
385 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
403 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
386 "Max. number of buffers in the queue (0=disable)", |
404 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT, |
387 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); |
405 DEFAULT_MAX_SIZE_BUFFERS, |
406 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
388 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, |
407 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, |
389 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
408 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
390 "Max. amount of data in the queue (in ns, 0=disable)", |
409 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64, |
391 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); |
410 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
392 |
411 |
393 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING, |
412 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING, |
394 g_param_spec_boolean ("use-buffering", "Use buffering", |
413 g_param_spec_boolean ("use-buffering", "Use buffering", |
395 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", |
414 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", |
396 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE)); |
415 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
397 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, |
416 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, |
398 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", |
417 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", |
399 "Estimate the bitrate of the stream to calculate time level", |
418 "Estimate the bitrate of the stream to calculate time level", |
400 DEFAULT_USE_RATE_ESTIMATE, G_PARAM_READWRITE)); |
419 DEFAULT_USE_RATE_ESTIMATE, |
420 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
401 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, |
421 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, |
402 g_param_spec_int ("low-percent", "Low percent", |
422 g_param_spec_int ("low-percent", "Low percent", |
403 "Low threshold for buffering to start", |
423 "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT, |
404 0, 100, DEFAULT_LOW_PERCENT, G_PARAM_READWRITE)); |
424 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
405 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, |
425 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, |
406 g_param_spec_int ("high-percent", "High percent", |
426 g_param_spec_int ("high-percent", "High percent", |
407 "High threshold for buffering to finish", |
427 "High threshold for buffering to finish", 0, 100, |
408 0, 100, DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE)); |
428 DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
429 |
|
430 g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE, |
|
431 g_param_spec_string ("temp-template", "Temporary File Template", |
|
432 "File template to store temporary files in, should contain directory " |
|
433 "and XXXXXX. (NULL == disabled)", |
|
434 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
409 |
435 |
410 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION, |
436 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION, |
411 g_param_spec_string ("temp-location", "Temporary File Location", |
437 g_param_spec_string ("temp-location", "Temporary File Location", |
412 "Location of a temporary file to store data in", |
438 "Location to store temporary files in (Deprecated: Only read this " |
413 NULL, G_PARAM_READWRITE)); |
439 "property, use temp-tmpl to configure the name template)", |
440 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); |
|
414 |
441 |
415 gst_element_class_add_pad_template (gstelement_class, |
442 gst_element_class_add_pad_template (gstelement_class, |
416 gst_static_pad_template_get (&srctemplate)); |
443 gst_static_pad_template_get (&srctemplate)); |
417 gst_element_class_add_pad_template (gstelement_class, |
444 gst_element_class_add_pad_template (gstelement_class, |
418 gst_static_pad_template_get (&sinktemplate)); |
445 gst_static_pad_template_get (&sinktemplate)); |
485 queue->waiting_del = FALSE; |
512 queue->waiting_del = FALSE; |
486 queue->item_del = g_cond_new (); |
513 queue->item_del = g_cond_new (); |
487 queue->queue = g_queue_new (); |
514 queue->queue = g_queue_new (); |
488 |
515 |
489 /* tempfile related */ |
516 /* tempfile related */ |
517 queue->temp_template = NULL; |
|
490 queue->temp_location = NULL; |
518 queue->temp_location = NULL; |
491 queue->temp_file = NULL; |
519 queue->temp_location_set = FALSE; |
492 |
520 |
493 GST_DEBUG_OBJECT (queue, |
521 GST_DEBUG_OBJECT (queue, |
494 "initialized queue's not_empty & not_full conditions"); |
522 "initialized queue's not_empty & not_full conditions"); |
495 } |
523 } |
496 |
524 |
514 g_cond_free (queue->item_del); |
542 g_cond_free (queue->item_del); |
515 g_timer_destroy (queue->in_timer); |
543 g_timer_destroy (queue->in_timer); |
516 g_timer_destroy (queue->out_timer); |
544 g_timer_destroy (queue->out_timer); |
517 |
545 |
518 /* temp_file path cleanup */ |
546 /* temp_file path cleanup */ |
519 if (queue->temp_location != NULL) |
547 g_free (queue->temp_template); |
520 g_free (queue->temp_location); |
548 g_free (queue->temp_location); |
521 |
549 |
522 G_OBJECT_CLASS (parent_class)->finalize (object); |
550 G_OBJECT_CLASS (parent_class)->finalize (object); |
523 } |
551 } |
524 |
552 |
525 static GstCaps * |
553 static GstCaps * |
687 queue->buffering_iteration++; |
715 queue->buffering_iteration++; |
688 post = TRUE; |
716 post = TRUE; |
689 } |
717 } |
690 } |
718 } |
691 if (post) { |
719 if (post) { |
720 GstMessage *message; |
|
721 GstBufferingMode mode; |
|
722 |
|
692 /* scale to high percent so that it becomes the 100% mark */ |
723 /* scale to high percent so that it becomes the 100% mark */ |
693 percent = percent * 100 / queue->high_percent; |
724 percent = percent * 100 / queue->high_percent; |
694 /* clip */ |
725 /* clip */ |
695 if (percent > 100) |
726 if (percent > 100) |
696 percent = 100; |
727 percent = 100; |
697 |
728 |
729 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
|
730 mode = GST_BUFFERING_DOWNLOAD; |
|
731 else |
|
732 mode = GST_BUFFERING_STREAM; |
|
733 |
|
698 GST_DEBUG_OBJECT (queue, "buffering %d percent", percent); |
734 GST_DEBUG_OBJECT (queue, "buffering %d percent", percent); |
699 gst_element_post_message (GST_ELEMENT_CAST (queue), |
735 message = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent); |
700 gst_message_new_buffering (GST_OBJECT_CAST (queue), percent)); |
736 gst_message_set_buffering_stats (message, mode, |
737 queue->byte_in_rate, queue->byte_out_rate, -1); |
|
738 |
|
739 gst_element_post_message (GST_ELEMENT_CAST (queue), message); |
|
740 |
|
701 } else { |
741 } else { |
702 GST_DEBUG_OBJECT (queue, "filled %d percent", percent); |
742 GST_DEBUG_OBJECT (queue, "filled %d percent", percent); |
703 } |
743 } |
704 |
744 |
705 #undef GET_PERCENT |
745 #undef GET_PERCENT |
798 |
838 |
799 /* reset the values to calculate rate over the next interval */ |
839 /* reset the values to calculate rate over the next interval */ |
800 queue->last_out_elapsed = elapsed; |
840 queue->last_out_elapsed = elapsed; |
801 queue->bytes_out = 0; |
841 queue->bytes_out = 0; |
802 } |
842 } |
843 if (queue->byte_in_rate > 0.0) { |
|
844 queue->cur_level.rate_time = |
|
845 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND; |
|
846 } |
|
803 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT, |
847 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT, |
804 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); |
848 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); |
805 } |
849 } |
806 |
850 |
807 static void |
851 static void |
809 { |
853 { |
810 guint size; |
854 guint size; |
811 guint8 *data; |
855 guint8 *data; |
812 int ret; |
856 int ret; |
813 |
857 |
858 #ifdef HAVE_FSEEKO |
|
859 fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET); |
|
860 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32) |
|
861 lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET); |
|
862 #else |
|
814 fseek (queue->temp_file, queue->writing_pos, SEEK_SET); |
863 fseek (queue->temp_file, queue->writing_pos, SEEK_SET); |
864 #endif |
|
815 |
865 |
816 data = GST_BUFFER_DATA (buffer); |
866 data = GST_BUFFER_DATA (buffer); |
817 size = GST_BUFFER_SIZE (buffer); |
867 size = GST_BUFFER_SIZE (buffer); |
818 |
868 |
819 ret = fwrite (data, 1, size, queue->temp_file); |
869 ret = fwrite (data, 1, size, queue->temp_file); |
820 if (ret < size) { |
870 if (ret < size) { |
821 /* FIXME do something useful here */ |
871 /* FIXME do something useful here */ |
822 GST_ERROR_OBJECT (queue, "fwrite returned error"); |
872 GST_ERROR_OBJECT (queue, "fwrite returned error"); |
823 } |
873 } |
824 queue->writing_pos += size; |
874 queue->writing_pos += size; |
875 |
|
876 if (queue->writing_pos > queue->max_reading_pos) |
|
877 queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; |
|
878 else |
|
879 queue->cur_level.bytes = 0; |
|
825 } |
880 } |
826 |
881 |
827 /* see if there is enough data in the file to read a full buffer */ |
882 /* see if there is enough data in the file to read a full buffer */ |
828 static gboolean |
883 static gboolean |
829 gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) |
884 gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) |
854 } |
909 } |
855 |
910 |
856 #ifdef HAVE_FSEEKO |
911 #ifdef HAVE_FSEEKO |
857 if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0) |
912 if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0) |
858 goto seek_failed; |
913 goto seek_failed; |
859 #elif defined (G_OS_UNIX) |
914 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32) |
860 if (lseek (fileno (queue->temp_file), (off_t) offset, |
915 if (lseek (fileno (queue->temp_file), (off_t) offset, |
861 SEEK_SET) == (off_t) - 1) |
916 SEEK_SET) == (off_t) - 1) |
862 goto seek_failed; |
917 goto seek_failed; |
863 #else |
918 #else |
864 if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0) |
919 if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0) |
887 GST_BUFFER_OFFSET_END (buf) = offset + length; |
942 GST_BUFFER_OFFSET_END (buf) = offset + length; |
888 |
943 |
889 *buffer = buf; |
944 *buffer = buf; |
890 |
945 |
891 queue->reading_pos = offset + length; |
946 queue->reading_pos = offset + length; |
947 queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos); |
|
948 |
|
949 if (queue->writing_pos > queue->max_reading_pos) |
|
950 queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos; |
|
951 else |
|
952 queue->cur_level.bytes = 0; |
|
892 |
953 |
893 return GST_FLOW_OK; |
954 return GST_FLOW_OK; |
894 |
955 |
895 /* ERRORS */ |
956 /* ERRORS */ |
896 out_flushing: |
957 out_flushing: |
949 } |
1010 } |
950 |
1011 |
951 static gboolean |
1012 static gboolean |
952 gst_queue_open_temp_location_file (GstQueue * queue) |
1013 gst_queue_open_temp_location_file (GstQueue * queue) |
953 { |
1014 { |
954 /* nothing to do */ |
1015 gint fd = -1; |
955 if (queue->temp_location == NULL) |
1016 gchar *name = NULL; |
956 goto no_filename; |
1017 |
957 |
1018 GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template); |
958 /* open the file for update/writing */ |
1019 |
959 queue->temp_file = g_fopen (queue->temp_location, "wb+"); |
1020 /* we have two cases: |
960 /* error creating file */ |
1021 * - temp_location was set to something !NULL (Deprecated). in this case we |
961 if (queue->temp_file == NULL) |
1022 * open the specified filename. |
962 goto open_failed; |
1023 * - temp_template was set, allocate a filename and open that filename |
1024 */ |
|
1025 if (!queue->temp_location_set) { |
|
1026 /* nothing to do */ |
|
1027 if (queue->temp_template == NULL) |
|
1028 goto no_directory; |
|
1029 |
|
1030 /* make copy of the template, we don't want to change this */ |
|
1031 name = g_strdup (queue->temp_template); |
|
1032 fd = g_mkstemp (name); |
|
1033 if (fd == -1) |
|
1034 goto mkstemp_failed; |
|
1035 |
|
1036 /* open the file for update/writing */ |
|
1037 queue->temp_file = fdopen (fd, "wb+"); |
|
1038 /* error creating file */ |
|
1039 if (queue->temp_file == NULL) |
|
1040 goto open_failed; |
|
1041 |
|
1042 g_free (queue->temp_location); |
|
1043 queue->temp_location = name; |
|
1044 |
|
1045 g_object_notify (G_OBJECT (queue), "temp-location"); |
|
1046 } else { |
|
1047 /* open the file for update/writing, this is deprecated but we still need to |
|
1048 * support it for API/ABI compatibility */ |
|
1049 queue->temp_file = g_fopen (queue->temp_location, "wb+"); |
|
1050 /* error creating file */ |
|
1051 if (queue->temp_file == NULL) |
|
1052 goto open_failed; |
|
1053 } |
|
963 |
1054 |
964 queue->writing_pos = 0; |
1055 queue->writing_pos = 0; |
965 queue->reading_pos = 0; |
1056 queue->reading_pos = 0; |
1057 queue->max_reading_pos = 0; |
|
966 |
1058 |
967 return TRUE; |
1059 return TRUE; |
968 |
1060 |
969 /* ERRORS */ |
1061 /* ERRORS */ |
970 no_filename: |
1062 no_directory: |
971 { |
1063 { |
972 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, |
1064 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, |
973 (_("No file name specified.")), (NULL)); |
1065 (_("No Temp directory specified.")), (NULL)); |
1066 return FALSE; |
|
1067 } |
|
1068 mkstemp_failed: |
|
1069 { |
|
1070 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, |
|
1071 (_("Could not create temp file \"%s\"."), queue->temp_template), |
|
1072 GST_ERROR_SYSTEM); |
|
1073 g_free (name); |
|
974 return FALSE; |
1074 return FALSE; |
975 } |
1075 } |
976 open_failed: |
1076 open_failed: |
977 { |
1077 { |
978 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, |
1078 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, |
979 (_("Could not open file \"%s\" for reading."), queue->temp_location), |
1079 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM); |
980 GST_ERROR_SYSTEM); |
1080 g_free (name); |
1081 if (fd != -1) |
|
1082 close (fd); |
|
981 return FALSE; |
1083 return FALSE; |
982 } |
1084 } |
983 } |
1085 } |
984 |
1086 |
985 static void |
1087 static void |
986 gst_queue_close_temp_location_file (GstQueue * queue) |
1088 gst_queue_close_temp_location_file (GstQueue * queue) |
987 { |
1089 { |
988 /* nothing to do */ |
1090 /* nothing to do */ |
989 if (queue->temp_file == NULL) |
1091 if (queue->temp_file == NULL) |
990 return; |
1092 return; |
1093 |
|
1094 GST_DEBUG_OBJECT (queue, "closing temp file"); |
|
991 |
1095 |
992 /* we don't remove the file so that the application can use it as a cache |
1096 /* we don't remove the file so that the application can use it as a cache |
993 * later on */ |
1097 * later on */ |
994 fflush (queue->temp_file); |
1098 fflush (queue->temp_file); |
995 fclose (queue->temp_file); |
1099 fclose (queue->temp_file); |
996 remove (queue->temp_location); |
1100 remove (queue->temp_location); |
997 queue->temp_file = NULL; |
1101 queue->temp_file = NULL; |
998 } |
1102 } |
999 |
1103 |
1000 static void |
1104 static void |
1105 gst_queue_flush_temp_file (GstQueue * queue) |
|
1106 { |
|
1107 if (queue->temp_file == NULL) |
|
1108 return; |
|
1109 |
|
1110 GST_DEBUG_OBJECT (queue, "flushing temp file"); |
|
1111 |
|
1112 queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file); |
|
1113 |
|
1114 queue->writing_pos = 0; |
|
1115 queue->reading_pos = 0; |
|
1116 queue->max_reading_pos = 0; |
|
1117 } |
|
1118 |
|
1119 static void |
|
1001 gst_queue_locked_flush (GstQueue * queue) |
1120 gst_queue_locked_flush (GstQueue * queue) |
1002 { |
1121 { |
1003 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1122 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1004 gst_queue_close_temp_location_file (queue); |
1123 gst_queue_flush_temp_file (queue); |
1005 gst_queue_open_temp_location_file (queue); |
|
1006 } else { |
1124 } else { |
1007 while (!g_queue_is_empty (queue->queue)) { |
1125 while (!g_queue_is_empty (queue->queue)) { |
1008 GstMiniObject *data = g_queue_pop_head (queue->queue); |
1126 GstMiniObject *data = g_queue_pop_head (queue->queue); |
1009 |
1127 |
1010 /* Then lose another reference because we are supposed to destroy that |
1128 /* Then lose another reference because we are supposed to destroy that |
1037 |
1155 |
1038 /* add buffer to the statistics */ |
1156 /* add buffer to the statistics */ |
1039 queue->cur_level.buffers++; |
1157 queue->cur_level.buffers++; |
1040 queue->cur_level.bytes += size; |
1158 queue->cur_level.bytes += size; |
1041 queue->bytes_in += size; |
1159 queue->bytes_in += size; |
1160 |
|
1042 /* apply new buffer to segment stats */ |
1161 /* apply new buffer to segment stats */ |
1043 apply_buffer (queue, buffer, &queue->sink_segment); |
1162 apply_buffer (queue, buffer, &queue->sink_segment); |
1044 /* update the byterate stats */ |
1163 /* update the byterate stats */ |
1045 update_in_rates (queue); |
1164 update_in_rates (queue); |
1046 |
1165 |
1066 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1185 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1067 if (queue->segment_event_received) |
1186 if (queue->segment_event_received) |
1068 goto unexpected_event; |
1187 goto unexpected_event; |
1069 |
1188 |
1070 queue->segment_event_received = TRUE; |
1189 queue->segment_event_received = TRUE; |
1190 if (queue->starting_segment != NULL) |
|
1191 gst_event_unref (queue->starting_segment); |
|
1071 queue->starting_segment = event; |
1192 queue->starting_segment = event; |
1193 item = NULL; |
|
1072 } |
1194 } |
1073 /* a new segment allows us to accept more buffers if we got UNEXPECTED |
1195 /* a new segment allows us to accept more buffers if we got UNEXPECTED |
1074 * from downstream */ |
1196 * from downstream */ |
1075 queue->unexpected = FALSE; |
1197 queue->unexpected = FALSE; |
1076 break; |
1198 break; |
1090 /* update the buffering status */ |
1212 /* update the buffering status */ |
1091 update_buffering (queue); |
1213 update_buffering (queue); |
1092 |
1214 |
1093 if (!QUEUE_IS_USING_TEMP_FILE (queue)) |
1215 if (!QUEUE_IS_USING_TEMP_FILE (queue)) |
1094 g_queue_push_tail (queue->queue, item); |
1216 g_queue_push_tail (queue->queue, item); |
1217 else |
|
1218 gst_mini_object_unref (GST_MINI_OBJECT_CAST (item)); |
|
1219 |
|
1095 GST_QUEUE_SIGNAL_ADD (queue); |
1220 GST_QUEUE_SIGNAL_ADD (queue); |
1096 } |
1221 } |
1097 |
1222 |
1098 return; |
1223 return; |
1099 |
1224 |
1264 /* never empty on EOS */ |
1389 /* never empty on EOS */ |
1265 if (queue->is_eos) |
1390 if (queue->is_eos) |
1266 return FALSE; |
1391 return FALSE; |
1267 |
1392 |
1268 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1393 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
1269 return queue->writing_pos == queue->reading_pos; |
1394 return queue->writing_pos == queue->max_reading_pos; |
1270 } else { |
1395 } else { |
1271 if (queue->queue->length == 0) |
1396 if (queue->queue->length == 0) |
1272 return TRUE; |
1397 return TRUE; |
1273 } |
1398 } |
1274 |
1399 |
1284 if (queue->is_eos) |
1409 if (queue->is_eos) |
1285 return TRUE; |
1410 return TRUE; |
1286 |
1411 |
1287 /* if using file, we're never filled if we don't have EOS */ |
1412 /* if using file, we're never filled if we don't have EOS */ |
1288 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
1413 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
1414 return FALSE; |
|
1415 |
|
1416 /* we are never filled when we have no buffers at all */ |
|
1417 if (queue->cur_level.buffers == 0) |
|
1289 return FALSE; |
1418 return FALSE; |
1290 |
1419 |
1291 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ |
1420 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ |
1292 (queue->cur_level.format) >= (queue->max_level.format)) |
1421 (queue->cur_level.format) >= (queue->max_level.format)) |
1293 |
1422 |
1546 #ifndef GST_DISABLE_GST_DEBUG |
1675 #ifndef GST_DISABLE_GST_DEBUG |
1547 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)", |
1676 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)", |
1548 event, GST_EVENT_TYPE_NAME (event)); |
1677 event, GST_EVENT_TYPE_NAME (event)); |
1549 #endif |
1678 #endif |
1550 |
1679 |
1551 /* just forward upstream */ |
1680 if (!QUEUE_IS_USING_TEMP_FILE (queue)) { |
1552 res = gst_pad_push_event (queue->sinkpad, event); |
1681 /* just forward upstream */ |
1682 res = gst_pad_push_event (queue->sinkpad, event); |
|
1683 } else { |
|
1684 /* when using a temp file, we unblock the pending read */ |
|
1685 res = TRUE; |
|
1686 gst_event_unref (event); |
|
1687 } |
|
1553 |
1688 |
1554 return res; |
1689 return res; |
1555 } |
1690 } |
1556 |
1691 |
1557 static gboolean |
1692 static gboolean |
1603 gst_query_set_position (query, format, peer_pos); |
1738 gst_query_set_position (query, format, peer_pos); |
1604 break; |
1739 break; |
1605 } |
1740 } |
1606 case GST_QUERY_DURATION: |
1741 case GST_QUERY_DURATION: |
1607 { |
1742 { |
1608 GST_DEBUG_OBJECT (queue, "waiting for preroll in duration query"); |
1743 GST_DEBUG_OBJECT (queue, "doing peer query"); |
1609 |
|
1610 GST_QUEUE_MUTEX_LOCK (queue); |
|
1611 /* we have to wait until the upstream element is at least paused, which |
|
1612 * happened when we received a first item. */ |
|
1613 while (gst_queue_is_empty (queue)) { |
|
1614 GST_QUEUE_WAIT_ADD_CHECK (queue, flushing); |
|
1615 } |
|
1616 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1617 |
1744 |
1618 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1745 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1619 goto peer_failed; |
1746 goto peer_failed; |
1620 |
1747 |
1621 GST_DEBUG_OBJECT (queue, "peer query success"); |
1748 GST_DEBUG_OBJECT (queue, "peer query success"); |
1749 break; |
|
1750 } |
|
1751 case GST_QUERY_BUFFERING: |
|
1752 { |
|
1753 GstFormat format; |
|
1754 |
|
1755 GST_DEBUG_OBJECT (queue, "query buffering"); |
|
1756 |
|
1757 if (!QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1758 /* no temp file, just forward to the peer */ |
|
1759 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
|
1760 goto peer_failed; |
|
1761 GST_DEBUG_OBJECT (queue, "buffering forwarded to peer"); |
|
1762 } else { |
|
1763 gint64 start, stop; |
|
1764 |
|
1765 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL); |
|
1766 |
|
1767 switch (format) { |
|
1768 case GST_FORMAT_PERCENT: |
|
1769 { |
|
1770 gint64 duration; |
|
1771 GstFormat peer_fmt; |
|
1772 |
|
1773 peer_fmt = GST_FORMAT_BYTES; |
|
1774 |
|
1775 if (!gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt, |
|
1776 &duration)) |
|
1777 goto peer_failed; |
|
1778 |
|
1779 GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %" |
|
1780 G_GINT64_FORMAT, duration, queue->writing_pos); |
|
1781 |
|
1782 start = 0; |
|
1783 /* get our available data relative to the duration */ |
|
1784 if (duration != -1) |
|
1785 stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration; |
|
1786 else |
|
1787 stop = -1; |
|
1788 break; |
|
1789 } |
|
1790 case GST_FORMAT_BYTES: |
|
1791 start = 0; |
|
1792 stop = queue->writing_pos; |
|
1793 break; |
|
1794 default: |
|
1795 start = -1; |
|
1796 stop = -1; |
|
1797 break; |
|
1798 } |
|
1799 gst_query_set_buffering_percent (query, queue->is_buffering, 100); |
|
1800 gst_query_set_buffering_range (query, format, start, stop, -1); |
|
1801 } |
|
1622 break; |
1802 break; |
1623 } |
1803 } |
1624 default: |
1804 default: |
1625 /* peer handled other queries */ |
1805 /* peer handled other queries */ |
1626 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1806 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
1632 |
1812 |
1633 /* ERRORS */ |
1813 /* ERRORS */ |
1634 peer_failed: |
1814 peer_failed: |
1635 { |
1815 { |
1636 GST_DEBUG_OBJECT (queue, "failed peer query"); |
1816 GST_DEBUG_OBJECT (queue, "failed peer query"); |
1637 return FALSE; |
|
1638 } |
|
1639 flushing: |
|
1640 { |
|
1641 GST_DEBUG_OBJECT (queue, "flushing while waiting for query"); |
|
1642 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1643 return FALSE; |
1817 return FALSE; |
1644 } |
1818 } |
1645 } |
1819 } |
1646 |
1820 |
1647 static GstFlowReturn |
1821 static GstFlowReturn |
1679 { |
1853 { |
1680 GstQueue *queue; |
1854 GstQueue *queue; |
1681 gboolean ret; |
1855 gboolean ret; |
1682 |
1856 |
1683 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
1857 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
1858 |
|
1684 /* we can operate in pull mode when we are using a tempfile */ |
1859 /* we can operate in pull mode when we are using a tempfile */ |
1685 ret = QUEUE_IS_USING_TEMP_FILE (queue); |
1860 ret = QUEUE_IS_USING_TEMP_FILE (queue); |
1861 |
|
1686 gst_object_unref (GST_OBJECT (queue)); |
1862 gst_object_unref (GST_OBJECT (queue)); |
1687 |
1863 |
1688 return ret; |
1864 return ret; |
1689 } |
1865 } |
1690 |
1866 |
1854 * be able to preceed. |
2030 * be able to preceed. |
1855 */ |
2031 */ |
1856 #define QUEUE_THRESHOLD_CHANGE(q)\ |
2032 #define QUEUE_THRESHOLD_CHANGE(q)\ |
1857 g_cond_signal (queue->item_add); |
2033 g_cond_signal (queue->item_add); |
1858 |
2034 |
1859 static gboolean |
2035 static void |
1860 gst_queue_set_temp_location (GstQueue * queue, const gchar * location) |
2036 gst_queue_set_temp_template (GstQueue * queue, const gchar * template) |
1861 { |
2037 { |
1862 GstState state; |
2038 GstState state; |
1863 |
2039 |
1864 /* the element must be stopped in order to do this */ |
2040 /* the element must be stopped in order to do this */ |
1865 GST_OBJECT_LOCK (queue); |
2041 GST_OBJECT_LOCK (queue); |
1867 if (state != GST_STATE_READY && state != GST_STATE_NULL) |
2043 if (state != GST_STATE_READY && state != GST_STATE_NULL) |
1868 goto wrong_state; |
2044 goto wrong_state; |
1869 GST_OBJECT_UNLOCK (queue); |
2045 GST_OBJECT_UNLOCK (queue); |
1870 |
2046 |
1871 /* set new location */ |
2047 /* set new location */ |
1872 g_free (queue->temp_location); |
2048 g_free (queue->temp_template); |
1873 queue->temp_location = g_strdup (location); |
2049 queue->temp_template = g_strdup (template); |
1874 |
2050 |
1875 g_object_notify (G_OBJECT (queue), "temp-location"); |
2051 return; |
1876 |
|
1877 return TRUE; |
|
1878 |
2052 |
1879 /* ERROR */ |
2053 /* ERROR */ |
1880 wrong_state: |
2054 wrong_state: |
1881 { |
2055 { |
1882 GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state"); |
2056 GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state"); |
1883 GST_OBJECT_UNLOCK (queue); |
2057 GST_OBJECT_UNLOCK (queue); |
1884 return FALSE; |
|
1885 } |
2058 } |
1886 } |
2059 } |
1887 |
2060 |
1888 static void |
2061 static void |
1889 gst_queue_set_property (GObject * object, |
2062 gst_queue_set_property (GObject * object, |
1921 queue->low_percent = g_value_get_int (value); |
2094 queue->low_percent = g_value_get_int (value); |
1922 break; |
2095 break; |
1923 case PROP_HIGH_PERCENT: |
2096 case PROP_HIGH_PERCENT: |
1924 queue->high_percent = g_value_get_int (value); |
2097 queue->high_percent = g_value_get_int (value); |
1925 break; |
2098 break; |
2099 case PROP_TEMP_TEMPLATE: |
|
2100 gst_queue_set_temp_template (queue, g_value_get_string (value)); |
|
2101 break; |
|
1926 case PROP_TEMP_LOCATION: |
2102 case PROP_TEMP_LOCATION: |
1927 gst_queue_set_temp_location (queue, g_value_dup_string (value)); |
2103 g_free (queue->temp_location); |
2104 queue->temp_location = g_value_dup_string (value); |
|
2105 /* you can set the property back to NULL to make it use the temp-tmpl |
|
2106 * property. */ |
|
2107 queue->temp_location_set = queue->temp_location != NULL; |
|
1928 break; |
2108 break; |
1929 default: |
2109 default: |
1930 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
2110 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
1931 break; |
2111 break; |
1932 } |
2112 } |
1970 case PROP_LOW_PERCENT: |
2150 case PROP_LOW_PERCENT: |
1971 g_value_set_int (value, queue->low_percent); |
2151 g_value_set_int (value, queue->low_percent); |
1972 break; |
2152 break; |
1973 case PROP_HIGH_PERCENT: |
2153 case PROP_HIGH_PERCENT: |
1974 g_value_set_int (value, queue->high_percent); |
2154 g_value_set_int (value, queue->high_percent); |
2155 break; |
|
2156 case PROP_TEMP_TEMPLATE: |
|
2157 g_value_set_string (value, queue->temp_template); |
|
1975 break; |
2158 break; |
1976 case PROP_TEMP_LOCATION: |
2159 case PROP_TEMP_LOCATION: |
1977 g_value_set_string (value, queue->temp_location); |
2160 g_value_set_string (value, queue->temp_location); |
1978 break; |
2161 break; |
1979 default: |
2162 default: |
1993 |
2176 |
1994 #ifdef ENABLE_NLS |
2177 #ifdef ENABLE_NLS |
1995 GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE, |
2178 GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE, |
1996 LOCALEDIR); |
2179 LOCALEDIR); |
1997 bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR); |
2180 bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR); |
2181 bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8"); |
|
1998 #endif /* ENABLE_NLS */ |
2182 #endif /* ENABLE_NLS */ |
1999 |
2183 |
2000 return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE); |
2184 return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE); |
2001 } |
2185 } |
2002 |
2186 |