|
1 /* GStreamer |
|
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu> |
|
3 * 2003 Colin Walters <cwalters@gnome.org> |
|
4 * 2000,2005,2007 Wim Taymans <wim.taymans@gmail.com> |
|
5 * 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br> |
|
6 * |
|
7 * gstqueue2.c: |
|
8 * |
|
9 * This library is free software; you can redistribute it and/or |
|
10 * modify it under the terms of the GNU Library General Public |
|
11 * License as published by the Free Software Foundation; either |
|
12 * version 2 of the License, or (at your option) any later version. |
|
13 * |
|
14 * This library is distributed in the hope that it will be useful, |
|
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
17 * Library General Public License for more details. |
|
18 * |
|
19 * You should have received a copy of the GNU Library General Public |
|
20 * License along with this library; if not, write to the |
|
21 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, |
|
22 * Boston, MA 02111-1307, USA. |
|
23 */ |
|
24 |
|
25 /** |
|
26 * SECTION:element-queue2 |
|
27 * @short_description: Asynchronous data queue. |
|
28 * |
|
29 * Data is queued until one of the limits specified by the |
|
30 * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or |
|
31 * #GstQueue:max-size-time properties has been reached. Any attempt to push |
|
32 * more buffers into the queue will block the pushing thread until more space |
|
33 * becomes available. |
|
34 * |
|
35 * The queue will create a new thread on the source pad to decouple the |
|
36 * processing on sink and source pad. |
|
37 * |
|
38 * You can query how many buffers are queued by reading the |
|
39 * #GstQueue:current-level-buffers property. |
|
40 * |
|
41 * The default queue size limits are 100 buffers, 2MB of data, or |
|
42 * two seconds worth of data, whichever is reached first. |
|
43 * |
|
44 * If you set temp-location, the element will buffer data on the file |
|
45 * specified by it. By using this, it will buffer the entire |
|
46 * stream data on the file independently of the queue size limits, they |
|
47 * will only be used for buffering statistics. |
|
48 * |
|
49 */ |
|
50 |
|
51 #ifdef HAVE_CONFIG_H |
|
52 #include "config.h" |
|
53 #endif |
|
54 |
|
55 #include <glib/gstdio.h> |
|
56 |
|
57 #include <gst/gst.h> |
|
58 #include <gst/gst-i18n-plugin.h> |
|
59 |
|
60 #ifdef __SYMBIAN32__ |
|
61 #include <glib_global.h> |
|
62 #endif |
|
63 static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue", |
|
64 "Generic", |
|
65 "Simple data queue", |
|
66 "Erik Walthinsen <omega@cse.ogi.edu>, " |
|
67 "Wim Taymans <wim.taymans@gmail.com>"); |
|
68 |
|
69 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", |
|
70 GST_PAD_SINK, |
|
71 GST_PAD_ALWAYS, |
|
72 GST_STATIC_CAPS_ANY); |
|
73 |
|
74 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", |
|
75 GST_PAD_SRC, |
|
76 GST_PAD_ALWAYS, |
|
77 GST_STATIC_CAPS_ANY); |
|
78 |
|
79 GST_DEBUG_CATEGORY_STATIC (queue_debug); |
|
80 #define GST_CAT_DEFAULT (queue_debug) |
|
81 GST_DEBUG_CATEGORY_STATIC (queue_dataflow); |
|
82 |
|
83 enum |
|
84 { |
|
85 LAST_SIGNAL |
|
86 }; |
|
87 |
|
88 /* default property values */ |
|
89 #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */ |
|
90 #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */ |
|
91 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */ |
|
92 #define DEFAULT_USE_BUFFERING FALSE |
|
93 #define DEFAULT_USE_RATE_ESTIMATE TRUE |
|
94 #define DEFAULT_LOW_PERCENT 10 |
|
95 #define DEFAULT_HIGH_PERCENT 99 |
|
96 |
|
97 /* other defines */ |
|
98 #define DEFAULT_BUFFER_SIZE 4096 |
|
99 #define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL) |
|
100 |
|
101 enum |
|
102 { |
|
103 PROP_0, |
|
104 PROP_CUR_LEVEL_BUFFERS, |
|
105 PROP_CUR_LEVEL_BYTES, |
|
106 PROP_CUR_LEVEL_TIME, |
|
107 PROP_MAX_SIZE_BUFFERS, |
|
108 PROP_MAX_SIZE_BYTES, |
|
109 PROP_MAX_SIZE_TIME, |
|
110 PROP_USE_BUFFERING, |
|
111 PROP_USE_RATE_ESTIMATE, |
|
112 PROP_LOW_PERCENT, |
|
113 PROP_HIGH_PERCENT, |
|
114 PROP_TEMP_LOCATION |
|
115 }; |
|
116 |
|
117 #define GST_TYPE_QUEUE \ |
|
118 (gst_queue_get_type()) |
|
119 #define GST_QUEUE(obj) \ |
|
120 (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_QUEUE,GstQueue)) |
|
121 #define GST_QUEUE_CLASS(klass) \ |
|
122 (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_QUEUE,GstQueueClass)) |
|
123 #define GST_IS_QUEUE(obj) \ |
|
124 (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_QUEUE)) |
|
125 #define GST_IS_QUEUE_CLASS(klass) \ |
|
126 (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE)) |
|
127 #define GST_QUEUE_CAST(obj) \ |
|
128 ((GstQueue *)(obj)) |
|
129 |
|
130 typedef struct _GstQueue GstQueue; |
|
131 typedef struct _GstQueueSize GstQueueSize; |
|
132 typedef struct _GstQueueClass GstQueueClass; |
|
133 |
|
134 /* used to keep track of sizes (current and max) */ |
|
135 struct _GstQueueSize |
|
136 { |
|
137 guint buffers; |
|
138 guint bytes; |
|
139 guint64 time; |
|
140 guint64 rate_time; |
|
141 }; |
|
142 |
|
143 #define GST_QUEUE_CLEAR_LEVEL(l) G_STMT_START { \ |
|
144 l.buffers = 0; \ |
|
145 l.bytes = 0; \ |
|
146 l.time = 0; \ |
|
147 l.rate_time = 0; \ |
|
148 } G_STMT_END |
|
149 |
|
150 struct _GstQueue |
|
151 { |
|
152 GstElement element; |
|
153 |
|
154 /*< private > */ |
|
155 GstPad *sinkpad; |
|
156 GstPad *srcpad; |
|
157 |
|
158 /* segments to keep track of timestamps */ |
|
159 GstSegment sink_segment; |
|
160 GstSegment src_segment; |
|
161 |
|
162 /* flowreturn when srcpad is paused */ |
|
163 GstFlowReturn srcresult; |
|
164 gboolean is_eos; |
|
165 gboolean unexpected; |
|
166 |
|
167 /* the queue of data we're keeping our hands on */ |
|
168 GQueue *queue; |
|
169 |
|
170 GstQueueSize cur_level; /* currently in the queue */ |
|
171 GstQueueSize max_level; /* max. amount of data allowed in the queue */ |
|
172 gboolean use_buffering; |
|
173 gboolean use_rate_estimate; |
|
174 GstClockTime buffering_interval; |
|
175 gint low_percent; /* low/high watermarks for buffering */ |
|
176 gint high_percent; |
|
177 |
|
178 /* current buffering state */ |
|
179 gboolean is_buffering; |
|
180 guint buffering_iteration; |
|
181 |
|
182 /* for measuring input/output rates */ |
|
183 GTimer *in_timer; |
|
184 gboolean in_timer_started; |
|
185 gdouble last_in_elapsed; |
|
186 guint64 bytes_in; |
|
187 gdouble byte_in_rate; |
|
188 |
|
189 GTimer *out_timer; |
|
190 gboolean out_timer_started; |
|
191 gdouble last_out_elapsed; |
|
192 guint64 bytes_out; |
|
193 gdouble byte_out_rate; |
|
194 |
|
195 GMutex *qlock; /* lock for queue (vs object lock) */ |
|
196 gboolean waiting_add; |
|
197 GCond *item_add; /* signals buffers now available for reading */ |
|
198 gboolean waiting_del; |
|
199 GCond *item_del; /* signals space now available for writing */ |
|
200 |
|
201 /* temp location stuff */ |
|
202 gchar *temp_location; |
|
203 FILE *temp_file; |
|
204 guint64 writing_pos; |
|
205 guint64 reading_pos; |
|
206 /* we need this to send the first new segment event of the stream |
|
207 * because we can't save it on the file */ |
|
208 gboolean segment_event_received; |
|
209 GstEvent *starting_segment; |
|
210 |
|
211 }; |
|
212 |
|
213 struct _GstQueueClass |
|
214 { |
|
215 GstElementClass parent_class; |
|
216 }; |
|
217 |
|
218 #define STATUS(queue, pad, msg) \ |
|
219 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \ |
|
220 "(%s:%s) " msg ": %u of %u buffers, %u of %u " \ |
|
221 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \ |
|
222 " ns, %"G_GUINT64_FORMAT" items", \ |
|
223 GST_DEBUG_PAD_NAME (pad), \ |
|
224 queue->cur_level.buffers, \ |
|
225 queue->max_level.buffers, \ |
|
226 queue->cur_level.bytes, \ |
|
227 queue->max_level.bytes, \ |
|
228 queue->cur_level.time, \ |
|
229 queue->max_level.time, \ |
|
230 (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \ |
|
231 queue->writing_pos - queue->reading_pos : \ |
|
232 queue->queue->length)) |
|
233 |
|
234 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ |
|
235 g_mutex_lock (q->qlock); \ |
|
236 } G_STMT_END |
|
237 |
|
238 #define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \ |
|
239 GST_QUEUE_MUTEX_LOCK (q); \ |
|
240 if (q->srcresult != GST_FLOW_OK) \ |
|
241 goto label; \ |
|
242 } G_STMT_END |
|
243 |
|
244 #define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \ |
|
245 g_mutex_unlock (q->qlock); \ |
|
246 } G_STMT_END |
|
247 |
|
248 #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \ |
|
249 STATUS (queue, q->sinkpad, "wait for DEL"); \ |
|
250 q->waiting_del = TRUE; \ |
|
251 g_cond_wait (q->item_del, queue->qlock); \ |
|
252 q->waiting_del = FALSE; \ |
|
253 if (q->srcresult != GST_FLOW_OK) { \ |
|
254 STATUS (queue, q->srcpad, "received DEL wakeup"); \ |
|
255 goto label; \ |
|
256 } \ |
|
257 STATUS (queue, q->sinkpad, "received DEL"); \ |
|
258 } G_STMT_END |
|
259 |
|
260 #define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START { \ |
|
261 STATUS (queue, q->srcpad, "wait for ADD"); \ |
|
262 q->waiting_add = TRUE; \ |
|
263 g_cond_wait (q->item_add, q->qlock); \ |
|
264 q->waiting_add = FALSE; \ |
|
265 if (q->srcresult != GST_FLOW_OK) { \ |
|
266 STATUS (queue, q->srcpad, "received ADD wakeup"); \ |
|
267 goto label; \ |
|
268 } \ |
|
269 STATUS (queue, q->srcpad, "received ADD"); \ |
|
270 } G_STMT_END |
|
271 |
|
272 #define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START { \ |
|
273 if (q->waiting_del) { \ |
|
274 STATUS (q, q->srcpad, "signal DEL"); \ |
|
275 g_cond_signal (q->item_del); \ |
|
276 } \ |
|
277 } G_STMT_END |
|
278 |
|
279 #define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START { \ |
|
280 if (q->waiting_add) { \ |
|
281 STATUS (q, q->sinkpad, "signal ADD"); \ |
|
282 g_cond_signal (q->item_add); \ |
|
283 } \ |
|
284 } G_STMT_END |
|
285 |
|
286 #define _do_init(bla) \ |
|
287 |
|
288 /* can't use boilerplate as we need to register with Queue2 to avoid conflicts |
|
289 * with queue in core elements */ |
|
290 static void gst_queue_class_init (GstQueueClass * klass); |
|
291 static void gst_queue_init (GstQueue * queue, GstQueueClass * g_class); |
|
292 static GstElementClass *parent_class; |
|
293 |
|
294 static GType |
|
295 gst_queue_get_type (void) |
|
296 { |
|
297 static GType gst_queue_type = 0; |
|
298 |
|
299 if (!gst_queue_type) { |
|
300 static const GTypeInfo gst_queue_info = { |
|
301 sizeof (GstQueueClass), |
|
302 NULL, |
|
303 NULL, |
|
304 (GClassInitFunc) gst_queue_class_init, |
|
305 NULL, |
|
306 NULL, |
|
307 sizeof (GstQueue), |
|
308 0, |
|
309 (GInstanceInitFunc) gst_queue_init, |
|
310 NULL |
|
311 }; |
|
312 |
|
313 gst_queue_type = |
|
314 g_type_register_static (GST_TYPE_ELEMENT, "GstQueue2", |
|
315 &gst_queue_info, 0); |
|
316 } |
|
317 return gst_queue_type; |
|
318 } |
|
319 |
|
320 static void gst_queue_finalize (GObject * object); |
|
321 |
|
322 static void gst_queue_set_property (GObject * object, |
|
323 guint prop_id, const GValue * value, GParamSpec * pspec); |
|
324 static void gst_queue_get_property (GObject * object, |
|
325 guint prop_id, GValue * value, GParamSpec * pspec); |
|
326 |
|
327 static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer); |
|
328 static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset, |
|
329 guint size, GstCaps * caps, GstBuffer ** buf); |
|
330 static GstFlowReturn gst_queue_push_one (GstQueue * queue); |
|
331 static void gst_queue_loop (GstPad * pad); |
|
332 |
|
333 static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event); |
|
334 |
|
335 static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event); |
|
336 static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query); |
|
337 |
|
338 static GstCaps *gst_queue_getcaps (GstPad * pad); |
|
339 |
|
340 static GstFlowReturn gst_queue_get_range (GstPad * pad, guint64 offset, |
|
341 guint length, GstBuffer ** buffer); |
|
342 static gboolean gst_queue_src_checkgetrange_function (GstPad * pad); |
|
343 |
|
344 static gboolean gst_queue_src_activate_pull (GstPad * pad, gboolean active); |
|
345 static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active); |
|
346 static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active); |
|
347 static GstStateChangeReturn gst_queue_change_state (GstElement * element, |
|
348 GstStateChange transition); |
|
349 |
|
350 static gboolean gst_queue_is_empty (GstQueue * queue); |
|
351 static gboolean gst_queue_is_filled (GstQueue * queue); |
|
352 |
|
353 /* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */ |
|
354 |
|
355 static void |
|
356 gst_queue_class_init (GstQueueClass * klass) |
|
357 { |
|
358 GObjectClass *gobject_class = G_OBJECT_CLASS (klass); |
|
359 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); |
|
360 |
|
361 parent_class = g_type_class_peek_parent (klass); |
|
362 |
|
363 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); |
|
364 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); |
|
365 |
|
366 /* properties */ |
|
367 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, |
|
368 g_param_spec_uint ("current-level-bytes", "Current level (kB)", |
|
369 "Current amount of data in the queue (bytes)", |
|
370 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
|
371 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS, |
|
372 g_param_spec_uint ("current-level-buffers", "Current level (buffers)", |
|
373 "Current number of buffers in the queue", |
|
374 0, G_MAXUINT, 0, G_PARAM_READABLE)); |
|
375 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME, |
|
376 g_param_spec_uint64 ("current-level-time", "Current level (ns)", |
|
377 "Current amount of data in the queue (in ns)", |
|
378 0, G_MAXUINT64, 0, G_PARAM_READABLE)); |
|
379 |
|
380 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES, |
|
381 g_param_spec_uint ("max-size-bytes", "Max. size (kB)", |
|
382 "Max. amount of data in the queue (bytes, 0=disable)", |
|
383 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE)); |
|
384 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS, |
|
385 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)", |
|
386 "Max. number of buffers in the queue (0=disable)", |
|
387 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE)); |
|
388 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME, |
|
389 g_param_spec_uint64 ("max-size-time", "Max. size (ns)", |
|
390 "Max. amount of data in the queue (in ns, 0=disable)", |
|
391 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE)); |
|
392 |
|
393 g_object_class_install_property (gobject_class, PROP_USE_BUFFERING, |
|
394 g_param_spec_boolean ("use-buffering", "Use buffering", |
|
395 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds", |
|
396 DEFAULT_USE_BUFFERING, G_PARAM_READWRITE)); |
|
397 g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE, |
|
398 g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate", |
|
399 "Estimate the bitrate of the stream to calculate time level", |
|
400 DEFAULT_USE_RATE_ESTIMATE, G_PARAM_READWRITE)); |
|
401 g_object_class_install_property (gobject_class, PROP_LOW_PERCENT, |
|
402 g_param_spec_int ("low-percent", "Low percent", |
|
403 "Low threshold for buffering to start", |
|
404 0, 100, DEFAULT_LOW_PERCENT, G_PARAM_READWRITE)); |
|
405 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT, |
|
406 g_param_spec_int ("high-percent", "High percent", |
|
407 "High threshold for buffering to finish", |
|
408 0, 100, DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE)); |
|
409 |
|
410 g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION, |
|
411 g_param_spec_string ("temp-location", "Temporary File Location", |
|
412 "Location of a temporary file to store data in", |
|
413 NULL, G_PARAM_READWRITE)); |
|
414 |
|
415 gst_element_class_add_pad_template (gstelement_class, |
|
416 gst_static_pad_template_get (&srctemplate)); |
|
417 gst_element_class_add_pad_template (gstelement_class, |
|
418 gst_static_pad_template_get (&sinktemplate)); |
|
419 |
|
420 gst_element_class_set_details (gstelement_class, &gst_queue_details); |
|
421 |
|
422 /* set several parent class virtual functions */ |
|
423 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_queue_finalize); |
|
424 |
|
425 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue_change_state); |
|
426 } |
|
427 |
|
428 static void |
|
429 gst_queue_init (GstQueue * queue, GstQueueClass * g_class) |
|
430 { |
|
431 queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); |
|
432 |
|
433 gst_pad_set_chain_function (queue->sinkpad, |
|
434 GST_DEBUG_FUNCPTR (gst_queue_chain)); |
|
435 gst_pad_set_activatepush_function (queue->sinkpad, |
|
436 GST_DEBUG_FUNCPTR (gst_queue_sink_activate_push)); |
|
437 gst_pad_set_event_function (queue->sinkpad, |
|
438 GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event)); |
|
439 gst_pad_set_getcaps_function (queue->sinkpad, |
|
440 GST_DEBUG_FUNCPTR (gst_queue_getcaps)); |
|
441 gst_pad_set_bufferalloc_function (queue->sinkpad, |
|
442 GST_DEBUG_FUNCPTR (gst_queue_bufferalloc)); |
|
443 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad); |
|
444 |
|
445 queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); |
|
446 |
|
447 gst_pad_set_activatepull_function (queue->srcpad, |
|
448 GST_DEBUG_FUNCPTR (gst_queue_src_activate_pull)); |
|
449 gst_pad_set_activatepush_function (queue->srcpad, |
|
450 GST_DEBUG_FUNCPTR (gst_queue_src_activate_push)); |
|
451 gst_pad_set_getrange_function (queue->srcpad, |
|
452 GST_DEBUG_FUNCPTR (gst_queue_get_range)); |
|
453 gst_pad_set_checkgetrange_function (queue->srcpad, |
|
454 GST_DEBUG_FUNCPTR (gst_queue_src_checkgetrange_function)); |
|
455 gst_pad_set_getcaps_function (queue->srcpad, |
|
456 GST_DEBUG_FUNCPTR (gst_queue_getcaps)); |
|
457 gst_pad_set_event_function (queue->srcpad, |
|
458 GST_DEBUG_FUNCPTR (gst_queue_handle_src_event)); |
|
459 gst_pad_set_query_function (queue->srcpad, |
|
460 GST_DEBUG_FUNCPTR (gst_queue_handle_src_query)); |
|
461 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); |
|
462 |
|
463 /* levels */ |
|
464 GST_QUEUE_CLEAR_LEVEL (queue->cur_level); |
|
465 queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS; |
|
466 queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES; |
|
467 queue->max_level.time = DEFAULT_MAX_SIZE_TIME; |
|
468 queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME; |
|
469 queue->use_buffering = DEFAULT_USE_BUFFERING; |
|
470 queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE; |
|
471 queue->low_percent = DEFAULT_LOW_PERCENT; |
|
472 queue->high_percent = DEFAULT_HIGH_PERCENT; |
|
473 |
|
474 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); |
|
475 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); |
|
476 |
|
477 queue->srcresult = GST_FLOW_WRONG_STATE; |
|
478 queue->is_eos = FALSE; |
|
479 queue->in_timer = g_timer_new (); |
|
480 queue->out_timer = g_timer_new (); |
|
481 |
|
482 queue->qlock = g_mutex_new (); |
|
483 queue->waiting_add = FALSE; |
|
484 queue->item_add = g_cond_new (); |
|
485 queue->waiting_del = FALSE; |
|
486 queue->item_del = g_cond_new (); |
|
487 queue->queue = g_queue_new (); |
|
488 |
|
489 /* tempfile related */ |
|
490 queue->temp_location = NULL; |
|
491 queue->temp_file = NULL; |
|
492 |
|
493 GST_DEBUG_OBJECT (queue, |
|
494 "initialized queue's not_empty & not_full conditions"); |
|
495 } |
|
496 |
|
497 /* called only once, as opposed to dispose */ |
|
498 static void |
|
499 gst_queue_finalize (GObject * object) |
|
500 { |
|
501 GstQueue *queue = GST_QUEUE (object); |
|
502 |
|
503 GST_DEBUG_OBJECT (queue, "finalizing queue"); |
|
504 |
|
505 while (!g_queue_is_empty (queue->queue)) { |
|
506 GstMiniObject *data = g_queue_pop_head (queue->queue); |
|
507 |
|
508 gst_mini_object_unref (data); |
|
509 } |
|
510 |
|
511 g_queue_free (queue->queue); |
|
512 g_mutex_free (queue->qlock); |
|
513 g_cond_free (queue->item_add); |
|
514 g_cond_free (queue->item_del); |
|
515 g_timer_destroy (queue->in_timer); |
|
516 g_timer_destroy (queue->out_timer); |
|
517 |
|
518 /* temp_file path cleanup */ |
|
519 if (queue->temp_location != NULL) |
|
520 g_free (queue->temp_location); |
|
521 |
|
522 G_OBJECT_CLASS (parent_class)->finalize (object); |
|
523 } |
|
524 |
|
525 static GstCaps * |
|
526 gst_queue_getcaps (GstPad * pad) |
|
527 { |
|
528 GstQueue *queue; |
|
529 GstPad *otherpad; |
|
530 GstCaps *result; |
|
531 |
|
532 queue = GST_QUEUE (GST_PAD_PARENT (pad)); |
|
533 |
|
534 otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad); |
|
535 result = gst_pad_peer_get_caps (otherpad); |
|
536 if (result == NULL) |
|
537 result = gst_caps_new_any (); |
|
538 |
|
539 return result; |
|
540 } |
|
541 |
|
542 static GstFlowReturn |
|
543 gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps, |
|
544 GstBuffer ** buf) |
|
545 { |
|
546 GstQueue *queue; |
|
547 GstFlowReturn result; |
|
548 |
|
549 queue = GST_QUEUE (GST_PAD_PARENT (pad)); |
|
550 |
|
551 /* Forward to src pad, without setting caps on the src pad */ |
|
552 result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf); |
|
553 |
|
554 return result; |
|
555 } |
|
556 |
|
557 /* calculate the diff between running time on the sink and src of the queue. |
|
558 * This is the total amount of time in the queue. */ |
|
559 static void |
|
560 update_time_level (GstQueue * queue) |
|
561 { |
|
562 gint64 sink_time, src_time; |
|
563 |
|
564 sink_time = |
|
565 gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME, |
|
566 queue->sink_segment.last_stop); |
|
567 |
|
568 src_time = gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME, |
|
569 queue->src_segment.last_stop); |
|
570 |
|
571 GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, |
|
572 GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time)); |
|
573 |
|
574 if (sink_time >= src_time) |
|
575 queue->cur_level.time = sink_time - src_time; |
|
576 else |
|
577 queue->cur_level.time = 0; |
|
578 } |
|
579 |
|
580 /* take a NEWSEGMENT event and apply the values to segment, updating the time |
|
581 * level of queue. */ |
|
582 static void |
|
583 apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment) |
|
584 { |
|
585 gboolean update; |
|
586 GstFormat format; |
|
587 gdouble rate, arate; |
|
588 gint64 start, stop, time; |
|
589 |
|
590 gst_event_parse_new_segment_full (event, &update, &rate, &arate, |
|
591 &format, &start, &stop, &time); |
|
592 |
|
593 GST_DEBUG_OBJECT (queue, |
|
594 "received NEWSEGMENT update %d, rate %lf, applied rate %lf, " |
|
595 "format %d, " |
|
596 "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %" |
|
597 G_GINT64_FORMAT, update, rate, arate, format, start, stop, time); |
|
598 |
|
599 if (format == GST_FORMAT_BYTES) { |
|
600 } |
|
601 |
|
602 /* now configure the values, we use these to track timestamps on the |
|
603 * sinkpad. */ |
|
604 if (format != GST_FORMAT_TIME) { |
|
605 /* non-time format, pretent the current time segment is closed with a |
|
606 * 0 start and unknown stop time. */ |
|
607 update = FALSE; |
|
608 format = GST_FORMAT_TIME; |
|
609 start = 0; |
|
610 stop = -1; |
|
611 time = 0; |
|
612 } |
|
613 gst_segment_set_newsegment_full (segment, update, |
|
614 rate, arate, format, start, stop, time); |
|
615 |
|
616 GST_DEBUG_OBJECT (queue, |
|
617 "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment); |
|
618 |
|
619 /* segment can update the time level of the queue */ |
|
620 update_time_level (queue); |
|
621 } |
|
622 |
|
623 /* take a buffer and update segment, updating the time level of the queue. */ |
|
624 static void |
|
625 apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment) |
|
626 { |
|
627 GstClockTime duration, timestamp; |
|
628 |
|
629 timestamp = GST_BUFFER_TIMESTAMP (buffer); |
|
630 duration = GST_BUFFER_DURATION (buffer); |
|
631 |
|
632 /* if no timestamp is set, assume it's continuous with the previous |
|
633 * time */ |
|
634 if (timestamp == GST_CLOCK_TIME_NONE) |
|
635 timestamp = segment->last_stop; |
|
636 |
|
637 /* add duration */ |
|
638 if (duration != GST_CLOCK_TIME_NONE) |
|
639 timestamp += duration; |
|
640 |
|
641 GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT, |
|
642 GST_TIME_ARGS (timestamp)); |
|
643 |
|
644 gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp); |
|
645 |
|
646 /* calc diff with other end */ |
|
647 update_time_level (queue); |
|
648 } |
|
649 |
|
650 static void |
|
651 update_buffering (GstQueue * queue) |
|
652 { |
|
653 gint percent; |
|
654 gboolean post = FALSE; |
|
655 |
|
656 if (!queue->use_buffering || queue->high_percent <= 0) |
|
657 return; |
|
658 |
|
659 #define GET_PERCENT(format) ((queue->max_level.format) > 0 ? \ |
|
660 (queue->cur_level.format) * 100 / (queue->max_level.format) : 0) |
|
661 |
|
662 if (queue->is_eos) { |
|
663 /* on EOS we are always 100% full, we set the var here so that it we can |
|
664 * resue the logic below to stop buffering */ |
|
665 percent = 100; |
|
666 } else { |
|
667 /* figure out the percent we are filled, we take the max of all formats. */ |
|
668 percent = GET_PERCENT (bytes); |
|
669 percent = MAX (percent, GET_PERCENT (time)); |
|
670 percent = MAX (percent, GET_PERCENT (buffers)); |
|
671 |
|
672 /* also apply the rate estimate when we need to */ |
|
673 if (queue->use_rate_estimate) |
|
674 percent = MAX (percent, GET_PERCENT (rate_time)); |
|
675 } |
|
676 |
|
677 if (queue->is_buffering) { |
|
678 post = TRUE; |
|
679 /* if we were buffering see if we reached the high watermark */ |
|
680 if (percent >= queue->high_percent) |
|
681 queue->is_buffering = FALSE; |
|
682 } else { |
|
683 /* we were not buffering, check if we need to start buffering if we drop |
|
684 * below the low threshold */ |
|
685 if (percent < queue->low_percent) { |
|
686 queue->is_buffering = TRUE; |
|
687 queue->buffering_iteration++; |
|
688 post = TRUE; |
|
689 } |
|
690 } |
|
691 if (post) { |
|
692 /* scale to high percent so that it becomes the 100% mark */ |
|
693 percent = percent * 100 / queue->high_percent; |
|
694 /* clip */ |
|
695 if (percent > 100) |
|
696 percent = 100; |
|
697 |
|
698 GST_DEBUG_OBJECT (queue, "buffering %d percent", percent); |
|
699 gst_element_post_message (GST_ELEMENT_CAST (queue), |
|
700 gst_message_new_buffering (GST_OBJECT_CAST (queue), percent)); |
|
701 } else { |
|
702 GST_DEBUG_OBJECT (queue, "filled %d percent", percent); |
|
703 } |
|
704 |
|
705 #undef GET_PERCENT |
|
706 } |
|
707 |
|
708 static void |
|
709 reset_rate_timer (GstQueue * queue) |
|
710 { |
|
711 queue->bytes_in = 0; |
|
712 queue->bytes_out = 0; |
|
713 queue->byte_in_rate = 0.0; |
|
714 queue->byte_out_rate = 0.0; |
|
715 queue->last_in_elapsed = 0.0; |
|
716 queue->last_out_elapsed = 0.0; |
|
717 queue->in_timer_started = FALSE; |
|
718 queue->out_timer_started = FALSE; |
|
719 } |
|
720 |
|
721 /* the interval in seconds to recalculate the rate */ |
|
722 #define RATE_INTERVAL 0.2 |
|
723 /* Tuning for rate estimation. We use a large window for the input rate because |
|
724 * it should be stable when connected to a network. The output rate is less |
|
725 * stable (the elements preroll, queues behind a demuxer fill, ...) and should |
|
726 * therefore adapt more quickly. */ |
|
727 #define AVG_IN(avg,val) ((avg) * 15.0 + (val)) / 16.0 |
|
728 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0 |
|
729 |
|
730 static void |
|
731 update_in_rates (GstQueue * queue) |
|
732 { |
|
733 gdouble elapsed, period; |
|
734 gdouble byte_in_rate; |
|
735 |
|
736 if (!queue->in_timer_started) { |
|
737 queue->in_timer_started = TRUE; |
|
738 g_timer_start (queue->in_timer); |
|
739 return; |
|
740 } |
|
741 |
|
742 elapsed = g_timer_elapsed (queue->in_timer, NULL); |
|
743 |
|
744 /* recalc after each interval. */ |
|
745 if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) { |
|
746 period = elapsed - queue->last_in_elapsed; |
|
747 |
|
748 GST_DEBUG_OBJECT (queue, |
|
749 "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in); |
|
750 |
|
751 byte_in_rate = queue->bytes_in / period; |
|
752 |
|
753 if (queue->byte_in_rate == 0.0) |
|
754 queue->byte_in_rate = byte_in_rate; |
|
755 else |
|
756 queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate); |
|
757 |
|
758 /* reset the values to calculate rate over the next interval */ |
|
759 queue->last_in_elapsed = elapsed; |
|
760 queue->bytes_in = 0; |
|
761 } |
|
762 |
|
763 if (queue->byte_in_rate > 0.0) { |
|
764 queue->cur_level.rate_time = |
|
765 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND; |
|
766 } |
|
767 GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT, |
|
768 queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); |
|
769 } |
|
770 |
|
771 static void |
|
772 update_out_rates (GstQueue * queue) |
|
773 { |
|
774 gdouble elapsed, period; |
|
775 gdouble byte_out_rate; |
|
776 |
|
777 if (!queue->out_timer_started) { |
|
778 queue->out_timer_started = TRUE; |
|
779 g_timer_start (queue->out_timer); |
|
780 return; |
|
781 } |
|
782 |
|
783 elapsed = g_timer_elapsed (queue->out_timer, NULL); |
|
784 |
|
785 /* recalc after each interval. */ |
|
786 if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) { |
|
787 period = elapsed - queue->last_out_elapsed; |
|
788 |
|
789 GST_DEBUG_OBJECT (queue, |
|
790 "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out); |
|
791 |
|
792 byte_out_rate = queue->bytes_out / period; |
|
793 |
|
794 if (queue->byte_out_rate == 0.0) |
|
795 queue->byte_out_rate = byte_out_rate; |
|
796 else |
|
797 queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate); |
|
798 |
|
799 /* reset the values to calculate rate over the next interval */ |
|
800 queue->last_out_elapsed = elapsed; |
|
801 queue->bytes_out = 0; |
|
802 } |
|
803 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT, |
|
804 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time)); |
|
805 } |
|
806 |
|
807 static void |
|
808 gst_queue_write_buffer_to_file (GstQueue * queue, GstBuffer * buffer) |
|
809 { |
|
810 guint size; |
|
811 guint8 *data; |
|
812 int ret; |
|
813 |
|
814 fseek (queue->temp_file, queue->writing_pos, SEEK_SET); |
|
815 |
|
816 data = GST_BUFFER_DATA (buffer); |
|
817 size = GST_BUFFER_SIZE (buffer); |
|
818 |
|
819 ret = fwrite (data, 1, size, queue->temp_file); |
|
820 if (ret < size) { |
|
821 /* FIXME do something useful here */ |
|
822 GST_ERROR_OBJECT (queue, "fwrite returned error"); |
|
823 } |
|
824 queue->writing_pos += size; |
|
825 } |
|
826 |
|
827 /* see if there is enough data in the file to read a full buffer */ |
|
828 static gboolean |
|
829 gst_queue_have_data (GstQueue * queue, guint64 offset, guint length) |
|
830 { |
|
831 GST_DEBUG_OBJECT (queue, |
|
832 "offset %" G_GUINT64_FORMAT ", len %u, write %" G_GUINT64_FORMAT, offset, |
|
833 length, queue->writing_pos); |
|
834 if (queue->is_eos) |
|
835 return TRUE; |
|
836 |
|
837 if (offset + length < queue->writing_pos) |
|
838 return TRUE; |
|
839 |
|
840 return FALSE; |
|
841 } |
|
842 |
|
843 static GstFlowReturn |
|
844 gst_queue_create_read (GstQueue * queue, guint64 offset, guint length, |
|
845 GstBuffer ** buffer) |
|
846 { |
|
847 size_t res; |
|
848 GstBuffer *buf; |
|
849 |
|
850 /* check if we have enough data at @offset. If there is not enough data, we |
|
851 * block and wait. */ |
|
852 while (!gst_queue_have_data (queue, offset, length)) { |
|
853 GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing); |
|
854 } |
|
855 |
|
856 #ifdef HAVE_FSEEKO |
|
857 if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0) |
|
858 goto seek_failed; |
|
859 #elif defined (G_OS_UNIX) |
|
860 if (lseek (fileno (queue->temp_file), (off_t) offset, |
|
861 SEEK_SET) == (off_t) - 1) |
|
862 goto seek_failed; |
|
863 #else |
|
864 if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0) |
|
865 goto seek_failed; |
|
866 #endif |
|
867 |
|
868 buf = gst_buffer_new_and_alloc (length); |
|
869 |
|
870 /* this should not block */ |
|
871 GST_LOG_OBJECT (queue, "Reading %d bytes", length); |
|
872 res = fread (GST_BUFFER_DATA (buf), 1, length, queue->temp_file); |
|
873 GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res); |
|
874 |
|
875 if (G_UNLIKELY (res == 0)) { |
|
876 /* check for errors or EOF */ |
|
877 if (ferror (queue->temp_file)) |
|
878 goto could_not_read; |
|
879 if (feof (queue->temp_file) && length > 0) |
|
880 goto eos; |
|
881 } |
|
882 |
|
883 length = res; |
|
884 |
|
885 GST_BUFFER_SIZE (buf) = length; |
|
886 GST_BUFFER_OFFSET (buf) = offset; |
|
887 GST_BUFFER_OFFSET_END (buf) = offset + length; |
|
888 |
|
889 *buffer = buf; |
|
890 |
|
891 queue->reading_pos = offset + length; |
|
892 |
|
893 return GST_FLOW_OK; |
|
894 |
|
895 /* ERRORS */ |
|
896 out_flushing: |
|
897 { |
|
898 GST_DEBUG_OBJECT (queue, "we are flushing"); |
|
899 return GST_FLOW_WRONG_STATE; |
|
900 } |
|
901 seek_failed: |
|
902 { |
|
903 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); |
|
904 return GST_FLOW_ERROR; |
|
905 } |
|
906 could_not_read: |
|
907 { |
|
908 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM); |
|
909 gst_buffer_unref (buf); |
|
910 return GST_FLOW_ERROR; |
|
911 } |
|
912 eos: |
|
913 { |
|
914 GST_DEBUG ("non-regular file hits EOS"); |
|
915 gst_buffer_unref (buf); |
|
916 return GST_FLOW_UNEXPECTED; |
|
917 } |
|
918 } |
|
919 |
|
920 /* should be called with QUEUE_LOCK */ |
|
921 static GstMiniObject * |
|
922 gst_queue_read_item_from_file (GstQueue * queue) |
|
923 { |
|
924 GstMiniObject *item; |
|
925 |
|
926 if (queue->starting_segment != NULL) { |
|
927 item = GST_MINI_OBJECT_CAST (queue->starting_segment); |
|
928 queue->starting_segment = NULL; |
|
929 } else { |
|
930 GstFlowReturn ret; |
|
931 GstBuffer *buffer; |
|
932 |
|
933 ret = |
|
934 gst_queue_create_read (queue, queue->reading_pos, DEFAULT_BUFFER_SIZE, |
|
935 &buffer); |
|
936 switch (ret) { |
|
937 case GST_FLOW_OK: |
|
938 item = GST_MINI_OBJECT_CAST (buffer); |
|
939 break; |
|
940 case GST_FLOW_UNEXPECTED: |
|
941 item = GST_MINI_OBJECT_CAST (gst_event_new_eos ()); |
|
942 break; |
|
943 default: |
|
944 item = NULL; |
|
945 break; |
|
946 } |
|
947 } |
|
948 return item; |
|
949 } |
|
950 |
|
951 static gboolean |
|
952 gst_queue_open_temp_location_file (GstQueue * queue) |
|
953 { |
|
954 /* nothing to do */ |
|
955 if (queue->temp_location == NULL) |
|
956 goto no_filename; |
|
957 |
|
958 /* open the file for update/writing */ |
|
959 queue->temp_file = g_fopen (queue->temp_location, "wb+"); |
|
960 /* error creating file */ |
|
961 if (queue->temp_file == NULL) |
|
962 goto open_failed; |
|
963 |
|
964 queue->writing_pos = 0; |
|
965 queue->reading_pos = 0; |
|
966 |
|
967 return TRUE; |
|
968 |
|
969 /* ERRORS */ |
|
970 no_filename: |
|
971 { |
|
972 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND, |
|
973 (_("No file name specified.")), (NULL)); |
|
974 return FALSE; |
|
975 } |
|
976 open_failed: |
|
977 { |
|
978 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ, |
|
979 (_("Could not open file \"%s\" for reading."), queue->temp_location), |
|
980 GST_ERROR_SYSTEM); |
|
981 return FALSE; |
|
982 } |
|
983 } |
|
984 |
|
985 static void |
|
986 gst_queue_close_temp_location_file (GstQueue * queue) |
|
987 { |
|
988 /* nothing to do */ |
|
989 if (queue->temp_file == NULL) |
|
990 return; |
|
991 |
|
992 /* we don't remove the file so that the application can use it as a cache |
|
993 * later on */ |
|
994 fflush (queue->temp_file); |
|
995 fclose (queue->temp_file); |
|
996 remove (queue->temp_location); |
|
997 queue->temp_file = NULL; |
|
998 } |
|
999 |
|
1000 static void |
|
1001 gst_queue_locked_flush (GstQueue * queue) |
|
1002 { |
|
1003 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1004 gst_queue_close_temp_location_file (queue); |
|
1005 gst_queue_open_temp_location_file (queue); |
|
1006 } else { |
|
1007 while (!g_queue_is_empty (queue->queue)) { |
|
1008 GstMiniObject *data = g_queue_pop_head (queue->queue); |
|
1009 |
|
1010 /* Then lose another reference because we are supposed to destroy that |
|
1011 data when flushing */ |
|
1012 gst_mini_object_unref (data); |
|
1013 } |
|
1014 } |
|
1015 GST_QUEUE_CLEAR_LEVEL (queue->cur_level); |
|
1016 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME); |
|
1017 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME); |
|
1018 if (queue->starting_segment != NULL) |
|
1019 gst_event_unref (queue->starting_segment); |
|
1020 queue->starting_segment = NULL; |
|
1021 queue->segment_event_received = FALSE; |
|
1022 |
|
1023 /* we deleted a lot of something */ |
|
1024 GST_QUEUE_SIGNAL_DEL (queue); |
|
1025 } |
|
1026 |
|
1027 /* enqueue an item an update the level stats */ |
|
1028 static void |
|
1029 gst_queue_locked_enqueue (GstQueue * queue, gpointer item) |
|
1030 { |
|
1031 if (GST_IS_BUFFER (item)) { |
|
1032 GstBuffer *buffer; |
|
1033 guint size; |
|
1034 |
|
1035 buffer = GST_BUFFER_CAST (item); |
|
1036 size = GST_BUFFER_SIZE (buffer); |
|
1037 |
|
1038 /* add buffer to the statistics */ |
|
1039 queue->cur_level.buffers++; |
|
1040 queue->cur_level.bytes += size; |
|
1041 queue->bytes_in += size; |
|
1042 /* apply new buffer to segment stats */ |
|
1043 apply_buffer (queue, buffer, &queue->sink_segment); |
|
1044 /* update the byterate stats */ |
|
1045 update_in_rates (queue); |
|
1046 |
|
1047 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1048 gst_queue_write_buffer_to_file (queue, buffer); |
|
1049 } |
|
1050 |
|
1051 } else if (GST_IS_EVENT (item)) { |
|
1052 GstEvent *event; |
|
1053 |
|
1054 event = GST_EVENT_CAST (item); |
|
1055 |
|
1056 switch (GST_EVENT_TYPE (event)) { |
|
1057 case GST_EVENT_EOS: |
|
1058 /* Zero the thresholds, this makes sure the queue is completely |
|
1059 * filled and we can read all data from the queue. */ |
|
1060 queue->is_eos = TRUE; |
|
1061 break; |
|
1062 case GST_EVENT_NEWSEGMENT: |
|
1063 apply_segment (queue, event, &queue->sink_segment); |
|
1064 /* This is our first new segment, we hold it |
|
1065 * as we can't save it on the temp file */ |
|
1066 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1067 if (queue->segment_event_received) |
|
1068 goto unexpected_event; |
|
1069 |
|
1070 queue->segment_event_received = TRUE; |
|
1071 queue->starting_segment = event; |
|
1072 } |
|
1073 /* a new segment allows us to accept more buffers if we got UNEXPECTED |
|
1074 * from downstream */ |
|
1075 queue->unexpected = FALSE; |
|
1076 break; |
|
1077 default: |
|
1078 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
|
1079 goto unexpected_event; |
|
1080 break; |
|
1081 } |
|
1082 } else { |
|
1083 g_warning ("Unexpected item %p added in queue %s (refcounting problem?)", |
|
1084 item, GST_OBJECT_NAME (queue)); |
|
1085 /* we can't really unref since we don't know what it is */ |
|
1086 item = NULL; |
|
1087 } |
|
1088 |
|
1089 if (item) { |
|
1090 /* update the buffering status */ |
|
1091 update_buffering (queue); |
|
1092 |
|
1093 if (!QUEUE_IS_USING_TEMP_FILE (queue)) |
|
1094 g_queue_push_tail (queue->queue, item); |
|
1095 GST_QUEUE_SIGNAL_ADD (queue); |
|
1096 } |
|
1097 |
|
1098 return; |
|
1099 |
|
1100 /* ERRORS */ |
|
1101 unexpected_event: |
|
1102 { |
|
1103 g_warning |
|
1104 ("Unexpected event of kind %s can't be added in temp file of queue %s ", |
|
1105 gst_event_type_get_name (GST_EVENT_TYPE (item)), |
|
1106 GST_OBJECT_NAME (queue)); |
|
1107 gst_event_unref (GST_EVENT_CAST (item)); |
|
1108 return; |
|
1109 } |
|
1110 } |
|
1111 |
|
1112 /* dequeue an item from the queue and update level stats */ |
|
1113 static GstMiniObject * |
|
1114 gst_queue_locked_dequeue (GstQueue * queue) |
|
1115 { |
|
1116 GstMiniObject *item; |
|
1117 |
|
1118 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
|
1119 item = gst_queue_read_item_from_file (queue); |
|
1120 else |
|
1121 item = g_queue_pop_head (queue->queue); |
|
1122 |
|
1123 if (item == NULL) |
|
1124 goto no_item; |
|
1125 |
|
1126 if (GST_IS_BUFFER (item)) { |
|
1127 GstBuffer *buffer; |
|
1128 guint size; |
|
1129 |
|
1130 buffer = GST_BUFFER_CAST (item); |
|
1131 size = GST_BUFFER_SIZE (buffer); |
|
1132 |
|
1133 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1134 "retrieved buffer %p from queue", buffer); |
|
1135 |
|
1136 queue->cur_level.buffers--; |
|
1137 queue->cur_level.bytes -= size; |
|
1138 queue->bytes_out += size; |
|
1139 apply_buffer (queue, buffer, &queue->src_segment); |
|
1140 /* update the byterate stats */ |
|
1141 update_out_rates (queue); |
|
1142 /* update the buffering */ |
|
1143 update_buffering (queue); |
|
1144 |
|
1145 } else if (GST_IS_EVENT (item)) { |
|
1146 GstEvent *event = GST_EVENT_CAST (item); |
|
1147 |
|
1148 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1149 "retrieved event %p from queue", event); |
|
1150 |
|
1151 switch (GST_EVENT_TYPE (event)) { |
|
1152 case GST_EVENT_EOS: |
|
1153 /* queue is empty now that we dequeued the EOS */ |
|
1154 GST_QUEUE_CLEAR_LEVEL (queue->cur_level); |
|
1155 break; |
|
1156 case GST_EVENT_NEWSEGMENT: |
|
1157 apply_segment (queue, event, &queue->src_segment); |
|
1158 break; |
|
1159 default: |
|
1160 break; |
|
1161 } |
|
1162 } else { |
|
1163 g_warning |
|
1164 ("Unexpected item %p dequeued from queue %s (refcounting problem?)", |
|
1165 item, GST_OBJECT_NAME (queue)); |
|
1166 item = NULL; |
|
1167 } |
|
1168 GST_QUEUE_SIGNAL_DEL (queue); |
|
1169 |
|
1170 return item; |
|
1171 |
|
1172 /* ERRORS */ |
|
1173 no_item: |
|
1174 { |
|
1175 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty"); |
|
1176 return NULL; |
|
1177 } |
|
1178 } |
|
1179 |
|
1180 static gboolean |
|
1181 gst_queue_handle_sink_event (GstPad * pad, GstEvent * event) |
|
1182 { |
|
1183 GstQueue *queue; |
|
1184 |
|
1185 queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); |
|
1186 |
|
1187 switch (GST_EVENT_TYPE (event)) { |
|
1188 case GST_EVENT_FLUSH_START: |
|
1189 { |
|
1190 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush start event"); |
|
1191 /* forward event */ |
|
1192 gst_pad_push_event (queue->srcpad, event); |
|
1193 |
|
1194 /* now unblock the chain function */ |
|
1195 GST_QUEUE_MUTEX_LOCK (queue); |
|
1196 queue->srcresult = GST_FLOW_WRONG_STATE; |
|
1197 /* unblock the loop and chain functions */ |
|
1198 g_cond_signal (queue->item_add); |
|
1199 g_cond_signal (queue->item_del); |
|
1200 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1201 |
|
1202 /* make sure it pauses, this should happen since we sent |
|
1203 * flush_start downstream. */ |
|
1204 gst_pad_pause_task (queue->srcpad); |
|
1205 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped"); |
|
1206 goto done; |
|
1207 } |
|
1208 case GST_EVENT_FLUSH_STOP: |
|
1209 { |
|
1210 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received flush stop event"); |
|
1211 /* forward event */ |
|
1212 gst_pad_push_event (queue->srcpad, event); |
|
1213 |
|
1214 GST_QUEUE_MUTEX_LOCK (queue); |
|
1215 gst_queue_locked_flush (queue); |
|
1216 queue->srcresult = GST_FLOW_OK; |
|
1217 queue->is_eos = FALSE; |
|
1218 queue->unexpected = FALSE; |
|
1219 /* reset rate counters */ |
|
1220 reset_rate_timer (queue); |
|
1221 gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, |
|
1222 queue->srcpad); |
|
1223 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1224 goto done; |
|
1225 } |
|
1226 default: |
|
1227 if (GST_EVENT_IS_SERIALIZED (event)) { |
|
1228 /* serialized events go in the queue */ |
|
1229 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); |
|
1230 /* refuse more events on EOS */ |
|
1231 if (queue->is_eos) |
|
1232 goto out_eos; |
|
1233 gst_queue_locked_enqueue (queue, event); |
|
1234 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1235 } else { |
|
1236 /* non-serialized events are passed upstream. */ |
|
1237 gst_pad_push_event (queue->srcpad, event); |
|
1238 } |
|
1239 break; |
|
1240 } |
|
1241 done: |
|
1242 return TRUE; |
|
1243 |
|
1244 /* ERRORS */ |
|
1245 out_flushing: |
|
1246 { |
|
1247 GST_DEBUG_OBJECT (queue, "refusing event, we are flushing"); |
|
1248 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1249 gst_event_unref (event); |
|
1250 return FALSE; |
|
1251 } |
|
1252 out_eos: |
|
1253 { |
|
1254 GST_DEBUG_OBJECT (queue, "refusing event, we are EOS"); |
|
1255 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1256 gst_event_unref (event); |
|
1257 return FALSE; |
|
1258 } |
|
1259 } |
|
1260 |
|
1261 static gboolean |
|
1262 gst_queue_is_empty (GstQueue * queue) |
|
1263 { |
|
1264 /* never empty on EOS */ |
|
1265 if (queue->is_eos) |
|
1266 return FALSE; |
|
1267 |
|
1268 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1269 return queue->writing_pos == queue->reading_pos; |
|
1270 } else { |
|
1271 if (queue->queue->length == 0) |
|
1272 return TRUE; |
|
1273 } |
|
1274 |
|
1275 return FALSE; |
|
1276 } |
|
1277 |
|
1278 static gboolean |
|
1279 gst_queue_is_filled (GstQueue * queue) |
|
1280 { |
|
1281 gboolean res; |
|
1282 |
|
1283 /* always filled on EOS */ |
|
1284 if (queue->is_eos) |
|
1285 return TRUE; |
|
1286 |
|
1287 /* if using file, we're never filled if we don't have EOS */ |
|
1288 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
|
1289 return FALSE; |
|
1290 |
|
1291 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \ |
|
1292 (queue->cur_level.format) >= (queue->max_level.format)) |
|
1293 |
|
1294 /* we are filled if one of the current levels exceeds the max */ |
|
1295 res = CHECK_FILLED (buffers) || CHECK_FILLED (bytes) || CHECK_FILLED (time); |
|
1296 |
|
1297 /* if we need to, use the rate estimate to check against the max time we are |
|
1298 * allowed to queue */ |
|
1299 if (queue->use_rate_estimate) |
|
1300 res |= CHECK_FILLED (rate_time); |
|
1301 |
|
1302 #undef CHECK_FILLED |
|
1303 return res; |
|
1304 } |
|
1305 |
|
1306 static GstFlowReturn |
|
1307 gst_queue_chain (GstPad * pad, GstBuffer * buffer) |
|
1308 { |
|
1309 GstQueue *queue; |
|
1310 |
|
1311 queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); |
|
1312 |
|
1313 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1314 "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %" |
|
1315 GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer), |
|
1316 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)), |
|
1317 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer))); |
|
1318 |
|
1319 /* we have to lock the queue since we span threads */ |
|
1320 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); |
|
1321 /* when we received EOS, we refuse more data */ |
|
1322 if (queue->is_eos) |
|
1323 goto out_eos; |
|
1324 /* when we received unexpected from downstream, refuse more buffers */ |
|
1325 if (queue->unexpected) |
|
1326 goto out_unexpected; |
|
1327 |
|
1328 /* We make space available if we're "full" according to whatever |
|
1329 * the user defined as "full". */ |
|
1330 if (gst_queue_is_filled (queue)) { |
|
1331 gboolean started; |
|
1332 |
|
1333 /* pause the timer while we wait. The fact that we are waiting does not mean |
|
1334 * the byterate on the input pad is lower */ |
|
1335 if ((started = queue->in_timer_started)) |
|
1336 g_timer_stop (queue->in_timer); |
|
1337 |
|
1338 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
|
1339 "queue is full, waiting for free space"); |
|
1340 do { |
|
1341 /* Wait for space to be available, we could be unlocked because of a flush. */ |
|
1342 GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing); |
|
1343 } |
|
1344 while (gst_queue_is_filled (queue)); |
|
1345 |
|
1346 /* and continue if we were running before */ |
|
1347 if (started) |
|
1348 g_timer_continue (queue->in_timer); |
|
1349 } |
|
1350 |
|
1351 /* put buffer in queue now */ |
|
1352 gst_queue_locked_enqueue (queue, buffer); |
|
1353 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1354 |
|
1355 return GST_FLOW_OK; |
|
1356 |
|
1357 /* special conditions */ |
|
1358 out_flushing: |
|
1359 { |
|
1360 GstFlowReturn ret = queue->srcresult; |
|
1361 |
|
1362 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1363 "exit because task paused, reason: %s", gst_flow_get_name (ret)); |
|
1364 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1365 gst_buffer_unref (buffer); |
|
1366 |
|
1367 return ret; |
|
1368 } |
|
1369 out_eos: |
|
1370 { |
|
1371 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); |
|
1372 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1373 gst_buffer_unref (buffer); |
|
1374 |
|
1375 return GST_FLOW_UNEXPECTED; |
|
1376 } |
|
1377 out_unexpected: |
|
1378 { |
|
1379 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1380 "exit because we received UNEXPECTED"); |
|
1381 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1382 gst_buffer_unref (buffer); |
|
1383 |
|
1384 return GST_FLOW_UNEXPECTED; |
|
1385 } |
|
1386 } |
|
1387 |
|
1388 /* dequeue an item from the queue an push it downstream. This functions returns |
|
1389 * the result of the push. */ |
|
1390 static GstFlowReturn |
|
1391 gst_queue_push_one (GstQueue * queue) |
|
1392 { |
|
1393 GstFlowReturn result = GST_FLOW_OK; |
|
1394 GstMiniObject *data; |
|
1395 |
|
1396 data = gst_queue_locked_dequeue (queue); |
|
1397 if (data == NULL) |
|
1398 goto no_item; |
|
1399 |
|
1400 next: |
|
1401 if (GST_IS_BUFFER (data)) { |
|
1402 GstBuffer *buffer; |
|
1403 GstCaps *caps; |
|
1404 |
|
1405 buffer = GST_BUFFER_CAST (data); |
|
1406 caps = GST_BUFFER_CAPS (buffer); |
|
1407 |
|
1408 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1409 |
|
1410 /* set caps before pushing the buffer so that core does not try to do |
|
1411 * something fancy to check if this is possible. */ |
|
1412 if (caps && caps != GST_PAD_CAPS (queue->srcpad)) |
|
1413 gst_pad_set_caps (queue->srcpad, caps); |
|
1414 |
|
1415 result = gst_pad_push (queue->srcpad, buffer); |
|
1416 |
|
1417 /* need to check for srcresult here as well */ |
|
1418 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); |
|
1419 if (result == GST_FLOW_UNEXPECTED) { |
|
1420 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1421 "got UNEXPECTED from downstream"); |
|
1422 /* stop pushing buffers, we dequeue all items until we see an item that we |
|
1423 * can push again, which is EOS or NEWSEGMENT. If there is nothing in the |
|
1424 * queue we can push, we set a flag to make the sinkpad refuse more |
|
1425 * buffers with an UNEXPECTED return value until we receive something |
|
1426 * pushable again or we get flushed. */ |
|
1427 while ((data = gst_queue_locked_dequeue (queue))) { |
|
1428 if (GST_IS_BUFFER (data)) { |
|
1429 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1430 "dropping UNEXPECTED buffer %p", data); |
|
1431 gst_buffer_unref (GST_BUFFER_CAST (data)); |
|
1432 } else if (GST_IS_EVENT (data)) { |
|
1433 GstEvent *event = GST_EVENT_CAST (data); |
|
1434 GstEventType type = GST_EVENT_TYPE (event); |
|
1435 |
|
1436 if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) { |
|
1437 /* we found a pushable item in the queue, push it out */ |
|
1438 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1439 "pushing pushable event %s after UNEXPECTED", |
|
1440 GST_EVENT_TYPE_NAME (event)); |
|
1441 goto next; |
|
1442 } |
|
1443 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1444 "dropping UNEXPECTED event %p", event); |
|
1445 gst_event_unref (event); |
|
1446 } |
|
1447 } |
|
1448 /* no more items in the queue. Set the unexpected flag so that upstream |
|
1449 * make us refuse any more buffers on the sinkpad. Since we will still |
|
1450 * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the |
|
1451 * task function does not shut down. */ |
|
1452 queue->unexpected = TRUE; |
|
1453 result = GST_FLOW_OK; |
|
1454 } |
|
1455 } else if (GST_IS_EVENT (data)) { |
|
1456 GstEvent *event = GST_EVENT_CAST (data); |
|
1457 GstEventType type = GST_EVENT_TYPE (event); |
|
1458 |
|
1459 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1460 |
|
1461 gst_pad_push_event (queue->srcpad, event); |
|
1462 |
|
1463 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); |
|
1464 /* if we're EOS, return UNEXPECTED so that the task pauses. */ |
|
1465 if (type == GST_EVENT_EOS) { |
|
1466 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1467 "pushed EOS event %p, return UNEXPECTED", event); |
|
1468 result = GST_FLOW_UNEXPECTED; |
|
1469 } |
|
1470 } |
|
1471 return result; |
|
1472 |
|
1473 /* ERRORS */ |
|
1474 no_item: |
|
1475 { |
|
1476 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1477 "exit because we have no item in the queue"); |
|
1478 return GST_FLOW_ERROR; |
|
1479 } |
|
1480 out_flushing: |
|
1481 { |
|
1482 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing"); |
|
1483 return GST_FLOW_WRONG_STATE; |
|
1484 } |
|
1485 } |
|
1486 |
|
1487 /* called repeadedly with @pad as the source pad. This function should push out |
|
1488 * data to the peer element. */ |
|
1489 static void |
|
1490 gst_queue_loop (GstPad * pad) |
|
1491 { |
|
1492 GstQueue *queue; |
|
1493 GstFlowReturn ret; |
|
1494 |
|
1495 queue = GST_QUEUE (GST_PAD_PARENT (pad)); |
|
1496 |
|
1497 /* have to lock for thread-safety */ |
|
1498 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); |
|
1499 |
|
1500 if (gst_queue_is_empty (queue)) { |
|
1501 gboolean started; |
|
1502 |
|
1503 /* pause the timer while we wait. The fact that we are waiting does not mean |
|
1504 * the byterate on the output pad is lower */ |
|
1505 if ((started = queue->out_timer_started)) |
|
1506 g_timer_stop (queue->out_timer); |
|
1507 |
|
1508 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, |
|
1509 "queue is empty, waiting for new data"); |
|
1510 do { |
|
1511 /* Wait for data to be available, we could be unlocked because of a flush. */ |
|
1512 GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing); |
|
1513 } |
|
1514 while (gst_queue_is_empty (queue)); |
|
1515 |
|
1516 /* and continue if we were running before */ |
|
1517 if (started) |
|
1518 g_timer_continue (queue->out_timer); |
|
1519 } |
|
1520 ret = gst_queue_push_one (queue); |
|
1521 queue->srcresult = ret; |
|
1522 if (ret != GST_FLOW_OK) |
|
1523 goto out_flushing; |
|
1524 |
|
1525 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1526 |
|
1527 return; |
|
1528 |
|
1529 /* ERRORS */ |
|
1530 out_flushing: |
|
1531 { |
|
1532 gst_pad_pause_task (queue->srcpad); |
|
1533 GST_CAT_LOG_OBJECT (queue_dataflow, queue, |
|
1534 "pause task, reason: %s", gst_flow_get_name (queue->srcresult)); |
|
1535 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1536 return; |
|
1537 } |
|
1538 } |
|
1539 |
|
1540 static gboolean |
|
1541 gst_queue_handle_src_event (GstPad * pad, GstEvent * event) |
|
1542 { |
|
1543 gboolean res = TRUE; |
|
1544 GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); |
|
1545 |
|
1546 #ifndef GST_DISABLE_GST_DEBUG |
|
1547 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)", |
|
1548 event, GST_EVENT_TYPE_NAME (event)); |
|
1549 #endif |
|
1550 |
|
1551 /* just forward upstream */ |
|
1552 res = gst_pad_push_event (queue->sinkpad, event); |
|
1553 |
|
1554 return res; |
|
1555 } |
|
1556 |
|
1557 static gboolean |
|
1558 gst_queue_peer_query (GstQueue * queue, GstPad * pad, GstQuery * query) |
|
1559 { |
|
1560 gboolean ret = FALSE; |
|
1561 GstPad *peer; |
|
1562 |
|
1563 if ((peer = gst_pad_get_peer (pad))) { |
|
1564 ret = gst_pad_query (peer, query); |
|
1565 gst_object_unref (peer); |
|
1566 } |
|
1567 return ret; |
|
1568 } |
|
1569 |
|
1570 static gboolean |
|
1571 gst_queue_handle_src_query (GstPad * pad, GstQuery * query) |
|
1572 { |
|
1573 GstQueue *queue; |
|
1574 |
|
1575 queue = GST_QUEUE (GST_PAD_PARENT (pad)); |
|
1576 |
|
1577 switch (GST_QUERY_TYPE (query)) { |
|
1578 case GST_QUERY_POSITION: |
|
1579 { |
|
1580 gint64 peer_pos; |
|
1581 GstFormat format; |
|
1582 |
|
1583 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
|
1584 goto peer_failed; |
|
1585 |
|
1586 /* get peer position */ |
|
1587 gst_query_parse_position (query, &format, &peer_pos); |
|
1588 |
|
1589 /* FIXME: this code assumes that there's no discont in the queue */ |
|
1590 switch (format) { |
|
1591 case GST_FORMAT_BYTES: |
|
1592 peer_pos -= queue->cur_level.bytes; |
|
1593 break; |
|
1594 case GST_FORMAT_TIME: |
|
1595 peer_pos -= queue->cur_level.time; |
|
1596 break; |
|
1597 default: |
|
1598 GST_WARNING_OBJECT (queue, "dropping query in %s format, don't " |
|
1599 "know how to adjust value", gst_format_get_name (format)); |
|
1600 return FALSE; |
|
1601 } |
|
1602 /* set updated position */ |
|
1603 gst_query_set_position (query, format, peer_pos); |
|
1604 break; |
|
1605 } |
|
1606 case GST_QUERY_DURATION: |
|
1607 { |
|
1608 GST_DEBUG_OBJECT (queue, "waiting for preroll in duration query"); |
|
1609 |
|
1610 GST_QUEUE_MUTEX_LOCK (queue); |
|
1611 /* we have to wait until the upstream element is at least paused, which |
|
1612 * happened when we received a first item. */ |
|
1613 while (gst_queue_is_empty (queue)) { |
|
1614 GST_QUEUE_WAIT_ADD_CHECK (queue, flushing); |
|
1615 } |
|
1616 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1617 |
|
1618 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
|
1619 goto peer_failed; |
|
1620 |
|
1621 GST_DEBUG_OBJECT (queue, "peer query success"); |
|
1622 break; |
|
1623 } |
|
1624 default: |
|
1625 /* peer handled other queries */ |
|
1626 if (!gst_queue_peer_query (queue, queue->sinkpad, query)) |
|
1627 goto peer_failed; |
|
1628 break; |
|
1629 } |
|
1630 |
|
1631 return TRUE; |
|
1632 |
|
1633 /* ERRORS */ |
|
1634 peer_failed: |
|
1635 { |
|
1636 GST_DEBUG_OBJECT (queue, "failed peer query"); |
|
1637 return FALSE; |
|
1638 } |
|
1639 flushing: |
|
1640 { |
|
1641 GST_DEBUG_OBJECT (queue, "flushing while waiting for query"); |
|
1642 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1643 return FALSE; |
|
1644 } |
|
1645 } |
|
1646 |
|
1647 static GstFlowReturn |
|
1648 gst_queue_get_range (GstPad * pad, guint64 offset, guint length, |
|
1649 GstBuffer ** buffer) |
|
1650 { |
|
1651 GstQueue *queue; |
|
1652 GstFlowReturn ret; |
|
1653 |
|
1654 queue = GST_QUEUE_CAST (gst_pad_get_parent (pad)); |
|
1655 |
|
1656 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); |
|
1657 length = (length == -1) ? DEFAULT_BUFFER_SIZE : length; |
|
1658 offset = (offset == -1) ? queue->reading_pos : offset; |
|
1659 |
|
1660 /* function will block when the range is not yet available */ |
|
1661 ret = gst_queue_create_read (queue, offset, length, buffer); |
|
1662 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1663 |
|
1664 gst_object_unref (queue); |
|
1665 |
|
1666 return ret; |
|
1667 |
|
1668 /* ERRORS */ |
|
1669 out_flushing: |
|
1670 { |
|
1671 GST_DEBUG_OBJECT (queue, "we are flushing"); |
|
1672 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1673 return GST_FLOW_WRONG_STATE; |
|
1674 } |
|
1675 } |
|
1676 |
|
1677 static gboolean |
|
1678 gst_queue_src_checkgetrange_function (GstPad * pad) |
|
1679 { |
|
1680 GstQueue *queue; |
|
1681 gboolean ret; |
|
1682 |
|
1683 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
|
1684 /* we can operate in pull mode when we are using a tempfile */ |
|
1685 ret = QUEUE_IS_USING_TEMP_FILE (queue); |
|
1686 gst_object_unref (GST_OBJECT (queue)); |
|
1687 |
|
1688 return ret; |
|
1689 } |
|
1690 |
|
1691 /* sink currently only operates in push mode */ |
|
1692 static gboolean |
|
1693 gst_queue_sink_activate_push (GstPad * pad, gboolean active) |
|
1694 { |
|
1695 gboolean result = TRUE; |
|
1696 GstQueue *queue; |
|
1697 |
|
1698 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
|
1699 |
|
1700 if (active) { |
|
1701 GST_QUEUE_MUTEX_LOCK (queue); |
|
1702 GST_DEBUG_OBJECT (queue, "activating push mode"); |
|
1703 queue->srcresult = GST_FLOW_OK; |
|
1704 queue->is_eos = FALSE; |
|
1705 queue->unexpected = FALSE; |
|
1706 reset_rate_timer (queue); |
|
1707 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1708 } else { |
|
1709 /* unblock chain function */ |
|
1710 GST_QUEUE_MUTEX_LOCK (queue); |
|
1711 GST_DEBUG_OBJECT (queue, "deactivating push mode"); |
|
1712 queue->srcresult = GST_FLOW_WRONG_STATE; |
|
1713 gst_queue_locked_flush (queue); |
|
1714 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1715 } |
|
1716 |
|
1717 gst_object_unref (queue); |
|
1718 |
|
1719 return result; |
|
1720 } |
|
1721 |
|
1722 /* src operating in push mode, we start a task on the source pad that pushes out |
|
1723 * buffers from the queue */ |
|
1724 static gboolean |
|
1725 gst_queue_src_activate_push (GstPad * pad, gboolean active) |
|
1726 { |
|
1727 gboolean result = FALSE; |
|
1728 GstQueue *queue; |
|
1729 |
|
1730 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
|
1731 |
|
1732 if (active) { |
|
1733 GST_QUEUE_MUTEX_LOCK (queue); |
|
1734 GST_DEBUG_OBJECT (queue, "activating push mode"); |
|
1735 queue->srcresult = GST_FLOW_OK; |
|
1736 queue->is_eos = FALSE; |
|
1737 queue->unexpected = FALSE; |
|
1738 result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); |
|
1739 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1740 } else { |
|
1741 /* unblock loop function */ |
|
1742 GST_QUEUE_MUTEX_LOCK (queue); |
|
1743 GST_DEBUG_OBJECT (queue, "deactivating push mode"); |
|
1744 queue->srcresult = GST_FLOW_WRONG_STATE; |
|
1745 /* the item add signal will unblock */ |
|
1746 g_cond_signal (queue->item_add); |
|
1747 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1748 |
|
1749 /* step 2, make sure streaming finishes */ |
|
1750 result = gst_pad_stop_task (pad); |
|
1751 } |
|
1752 |
|
1753 gst_object_unref (queue); |
|
1754 |
|
1755 return result; |
|
1756 } |
|
1757 |
|
1758 /* pull mode, downstream will call our getrange function */ |
|
1759 static gboolean |
|
1760 gst_queue_src_activate_pull (GstPad * pad, gboolean active) |
|
1761 { |
|
1762 gboolean result; |
|
1763 GstQueue *queue; |
|
1764 |
|
1765 queue = GST_QUEUE (gst_pad_get_parent (pad)); |
|
1766 |
|
1767 if (active) { |
|
1768 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1769 GST_QUEUE_MUTEX_LOCK (queue); |
|
1770 GST_DEBUG_OBJECT (queue, "activating pull mode"); |
|
1771 queue->srcresult = GST_FLOW_OK; |
|
1772 queue->is_eos = FALSE; |
|
1773 queue->unexpected = FALSE; |
|
1774 result = TRUE; |
|
1775 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1776 } else { |
|
1777 GST_QUEUE_MUTEX_LOCK (queue); |
|
1778 GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode"); |
|
1779 /* this is not allowed, we cannot operate in pull mode without a temp |
|
1780 * file. */ |
|
1781 queue->srcresult = GST_FLOW_WRONG_STATE; |
|
1782 result = FALSE; |
|
1783 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1784 } |
|
1785 } else { |
|
1786 GST_QUEUE_MUTEX_LOCK (queue); |
|
1787 GST_DEBUG_OBJECT (queue, "deactivating pull mode"); |
|
1788 queue->srcresult = GST_FLOW_WRONG_STATE; |
|
1789 /* this will unlock getrange */ |
|
1790 g_cond_signal (queue->item_add); |
|
1791 result = TRUE; |
|
1792 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1793 } |
|
1794 gst_object_unref (queue); |
|
1795 |
|
1796 return result; |
|
1797 } |
|
1798 |
|
1799 static GstStateChangeReturn |
|
1800 gst_queue_change_state (GstElement * element, GstStateChange transition) |
|
1801 { |
|
1802 GstQueue *queue; |
|
1803 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; |
|
1804 |
|
1805 queue = GST_QUEUE (element); |
|
1806 |
|
1807 switch (transition) { |
|
1808 case GST_STATE_CHANGE_NULL_TO_READY: |
|
1809 break; |
|
1810 case GST_STATE_CHANGE_READY_TO_PAUSED: |
|
1811 if (QUEUE_IS_USING_TEMP_FILE (queue)) { |
|
1812 if (!gst_queue_open_temp_location_file (queue)) |
|
1813 ret = GST_STATE_CHANGE_FAILURE; |
|
1814 } |
|
1815 queue->segment_event_received = FALSE; |
|
1816 queue->starting_segment = NULL; |
|
1817 break; |
|
1818 case GST_STATE_CHANGE_PAUSED_TO_PLAYING: |
|
1819 break; |
|
1820 default: |
|
1821 break; |
|
1822 } |
|
1823 |
|
1824 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); |
|
1825 |
|
1826 switch (transition) { |
|
1827 case GST_STATE_CHANGE_PLAYING_TO_PAUSED: |
|
1828 break; |
|
1829 case GST_STATE_CHANGE_PAUSED_TO_READY: |
|
1830 if (QUEUE_IS_USING_TEMP_FILE (queue)) |
|
1831 gst_queue_close_temp_location_file (queue); |
|
1832 if (queue->starting_segment != NULL) { |
|
1833 gst_event_unref (queue->starting_segment); |
|
1834 queue->starting_segment = NULL; |
|
1835 } |
|
1836 break; |
|
1837 case GST_STATE_CHANGE_READY_TO_NULL: |
|
1838 break; |
|
1839 default: |
|
1840 break; |
|
1841 } |
|
1842 |
|
1843 return ret; |
|
1844 } |
|
1845 |
|
1846 /* changing the capacity of the queue must wake up |
|
1847 * the _chain function, it might have more room now |
|
1848 * to store the buffer/event in the queue */ |
|
1849 #define QUEUE_CAPACITY_CHANGE(q)\ |
|
1850 g_cond_signal (queue->item_del); |
|
1851 |
|
1852 /* Changing the minimum required fill level must |
|
1853 * wake up the _loop function as it might now |
|
1854 * be able to preceed. |
|
1855 */ |
|
1856 #define QUEUE_THRESHOLD_CHANGE(q)\ |
|
1857 g_cond_signal (queue->item_add); |
|
1858 |
|
1859 static gboolean |
|
1860 gst_queue_set_temp_location (GstQueue * queue, const gchar * location) |
|
1861 { |
|
1862 GstState state; |
|
1863 |
|
1864 /* the element must be stopped in order to do this */ |
|
1865 GST_OBJECT_LOCK (queue); |
|
1866 state = GST_STATE (queue); |
|
1867 if (state != GST_STATE_READY && state != GST_STATE_NULL) |
|
1868 goto wrong_state; |
|
1869 GST_OBJECT_UNLOCK (queue); |
|
1870 |
|
1871 /* set new location */ |
|
1872 g_free (queue->temp_location); |
|
1873 queue->temp_location = g_strdup (location); |
|
1874 |
|
1875 g_object_notify (G_OBJECT (queue), "temp-location"); |
|
1876 |
|
1877 return TRUE; |
|
1878 |
|
1879 /* ERROR */ |
|
1880 wrong_state: |
|
1881 { |
|
1882 GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state"); |
|
1883 GST_OBJECT_UNLOCK (queue); |
|
1884 return FALSE; |
|
1885 } |
|
1886 } |
|
1887 |
|
1888 static void |
|
1889 gst_queue_set_property (GObject * object, |
|
1890 guint prop_id, const GValue * value, GParamSpec * pspec) |
|
1891 { |
|
1892 GstQueue *queue = GST_QUEUE (object); |
|
1893 |
|
1894 /* someone could change levels here, and since this |
|
1895 * affects the get/put funcs, we need to lock for safety. */ |
|
1896 GST_QUEUE_MUTEX_LOCK (queue); |
|
1897 |
|
1898 switch (prop_id) { |
|
1899 case PROP_MAX_SIZE_BYTES: |
|
1900 queue->max_level.bytes = g_value_get_uint (value); |
|
1901 QUEUE_CAPACITY_CHANGE (queue); |
|
1902 break; |
|
1903 case PROP_MAX_SIZE_BUFFERS: |
|
1904 queue->max_level.buffers = g_value_get_uint (value); |
|
1905 QUEUE_CAPACITY_CHANGE (queue); |
|
1906 break; |
|
1907 case PROP_MAX_SIZE_TIME: |
|
1908 queue->max_level.time = g_value_get_uint64 (value); |
|
1909 /* set rate_time to the same value. We use an extra field in the level |
|
1910 * structure so that we can easily access and compare it */ |
|
1911 queue->max_level.rate_time = queue->max_level.time; |
|
1912 QUEUE_CAPACITY_CHANGE (queue); |
|
1913 break; |
|
1914 case PROP_USE_BUFFERING: |
|
1915 queue->use_buffering = g_value_get_boolean (value); |
|
1916 break; |
|
1917 case PROP_USE_RATE_ESTIMATE: |
|
1918 queue->use_rate_estimate = g_value_get_boolean (value); |
|
1919 break; |
|
1920 case PROP_LOW_PERCENT: |
|
1921 queue->low_percent = g_value_get_int (value); |
|
1922 break; |
|
1923 case PROP_HIGH_PERCENT: |
|
1924 queue->high_percent = g_value_get_int (value); |
|
1925 break; |
|
1926 case PROP_TEMP_LOCATION: |
|
1927 gst_queue_set_temp_location (queue, g_value_dup_string (value)); |
|
1928 break; |
|
1929 default: |
|
1930 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
|
1931 break; |
|
1932 } |
|
1933 |
|
1934 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1935 } |
|
1936 |
|
1937 static void |
|
1938 gst_queue_get_property (GObject * object, |
|
1939 guint prop_id, GValue * value, GParamSpec * pspec) |
|
1940 { |
|
1941 GstQueue *queue = GST_QUEUE (object); |
|
1942 |
|
1943 GST_QUEUE_MUTEX_LOCK (queue); |
|
1944 |
|
1945 switch (prop_id) { |
|
1946 case PROP_CUR_LEVEL_BYTES: |
|
1947 g_value_set_uint (value, queue->cur_level.bytes); |
|
1948 break; |
|
1949 case PROP_CUR_LEVEL_BUFFERS: |
|
1950 g_value_set_uint (value, queue->cur_level.buffers); |
|
1951 break; |
|
1952 case PROP_CUR_LEVEL_TIME: |
|
1953 g_value_set_uint64 (value, queue->cur_level.time); |
|
1954 break; |
|
1955 case PROP_MAX_SIZE_BYTES: |
|
1956 g_value_set_uint (value, queue->max_level.bytes); |
|
1957 break; |
|
1958 case PROP_MAX_SIZE_BUFFERS: |
|
1959 g_value_set_uint (value, queue->max_level.buffers); |
|
1960 break; |
|
1961 case PROP_MAX_SIZE_TIME: |
|
1962 g_value_set_uint64 (value, queue->max_level.time); |
|
1963 break; |
|
1964 case PROP_USE_BUFFERING: |
|
1965 g_value_set_boolean (value, queue->use_buffering); |
|
1966 break; |
|
1967 case PROP_USE_RATE_ESTIMATE: |
|
1968 g_value_set_boolean (value, queue->use_rate_estimate); |
|
1969 break; |
|
1970 case PROP_LOW_PERCENT: |
|
1971 g_value_set_int (value, queue->low_percent); |
|
1972 break; |
|
1973 case PROP_HIGH_PERCENT: |
|
1974 g_value_set_int (value, queue->high_percent); |
|
1975 break; |
|
1976 case PROP_TEMP_LOCATION: |
|
1977 g_value_set_string (value, queue->temp_location); |
|
1978 break; |
|
1979 default: |
|
1980 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); |
|
1981 break; |
|
1982 } |
|
1983 |
|
1984 GST_QUEUE_MUTEX_UNLOCK (queue); |
|
1985 } |
|
1986 |
|
1987 static gboolean |
|
1988 plugin_init (GstPlugin * plugin) |
|
1989 { |
|
1990 GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); |
|
1991 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, |
|
1992 "dataflow inside the queue element"); |
|
1993 |
|
1994 #ifdef ENABLE_NLS |
|
1995 GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE, |
|
1996 LOCALEDIR); |
|
1997 bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR); |
|
1998 #endif /* ENABLE_NLS */ |
|
1999 |
|
2000 return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE); |
|
2001 } |
|
2002 |
|
2003 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, |
|
2004 GST_VERSION_MINOR, |
|
2005 "queue2", |
|
2006 "Queue newer version", plugin_init, VERSION, GST_LICENSE, |
|
2007 GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN); |
|
2008 |
|
2009 #ifdef __SYMBIAN32__ |
|
2010 EXPORT_C |
|
2011 #endif |
|
2012 GstPluginDesc* _GST_PLUGIN_DESC() |
|
2013 { |
|
2014 return &gst_plugin_desc; |
|
2015 } |
|
2016 |