gst_plugins_base/gst/playback/gstqueue2.c
changeset 8 4a7fac7dd34a
parent 0 0e761a78d257
child 30 7e817e7e631c
equal deleted inserted replaced
7:71e347f905f2 8:4a7fac7dd34a
    25 /**
    25 /**
    26  * SECTION:element-queue2
    26  * SECTION:element-queue2
    27  * @short_description: Asynchronous data queue.
    27  * @short_description: Asynchronous data queue.
    28  *
    28  *
    29  * Data is queued until one of the limits specified by the
    29  * Data is queued until one of the limits specified by the
    30  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
    30  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
    31  * #GstQueue:max-size-time properties has been reached. Any attempt to push
    31  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
    32  * more buffers into the queue will block the pushing thread until more space
    32  * more buffers into the queue will block the pushing thread until more space
    33  * becomes available.
    33  * becomes available.
    34  *
    34  *
    35  * The queue will create a new thread on the source pad to decouple the
    35  * The queue will create a new thread on the source pad to decouple the
    36  * processing on sink and source pad.
    36  * processing on sink and source pad.
    37  *
    37  *
    38  * You can query how many buffers are queued by reading the
    38  * You can query how many buffers are queued by reading the
    39  * #GstQueue:current-level-buffers property.
    39  * #GstQueue2:current-level-buffers property.
    40  *
    40  *
    41  * The default queue size limits are 100 buffers, 2MB of data, or
    41  * The default queue size limits are 100 buffers, 2MB of data, or
    42  * two seconds worth of data, whichever is reached first.
    42  * two seconds worth of data, whichever is reached first.
    43  *
    43  *
    44  * If you set temp-location, the element will buffer data on the file
    44  * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
    45  * specified by it. By using this, it will buffer the entire 
    45  * will allocate a random free filename and buffer data in the file.
    46  * stream data on the file independently of the queue size limits, they
    46  * By using this, it will buffer the entire stream data on the file independently
    47  * will only be used for buffering statistics.
    47  * of the queue size limits, they will only be used for buffering statistics.
    48  *
    48  *
       
    49  * Since 0.10.24, setting the temp-location property with a filename is deprecated
       
    50  * because it's impossible to securely open a temporary file in this way. The
       
    51  * property will still be used to notify the application of the allocated
       
    52  * filename, though.
       
    53  *
       
    54  * Last reviewed on 2009-07-10 (0.10.24)
    49  */
    55  */
    50 
    56 
    51 #ifdef HAVE_CONFIG_H
    57 #ifdef HAVE_CONFIG_H
    52 #include "config.h"
    58 #include "config.h"
    53 #endif
    59 #endif
    55 #include <glib/gstdio.h>
    61 #include <glib/gstdio.h>
    56 
    62 
    57 #include <gst/gst.h>
    63 #include <gst/gst.h>
    58 #include <gst/gst-i18n-plugin.h>
    64 #include <gst/gst-i18n-plugin.h>
    59 
    65 
    60 #ifdef __SYMBIAN32__
    66 #ifdef G_OS_WIN32
    61 #include <glib_global.h>
    67 #include <io.h>                 /* lseek, open, close, read */
       
    68 #undef lseek
       
    69 #define lseek _lseeki64
       
    70 #undef off_t
       
    71 #define off_t guint64
       
    72 #else
       
    73 #include <unistd.h>
    62 #endif
    74 #endif
       
    75 
    63 static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
    76 static const GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
    64     "Generic",
    77     "Generic",
    65     "Simple data queue",
    78     "Simple data queue",
    66     "Erik Walthinsen <omega@cse.ogi.edu>, "
    79     "Erik Walthinsen <omega@cse.ogi.edu>, "
    67     "Wim Taymans <wim.taymans@gmail.com>");
    80     "Wim Taymans <wim.taymans@gmail.com>");
    94 #define DEFAULT_LOW_PERCENT        10
   107 #define DEFAULT_LOW_PERCENT        10
    95 #define DEFAULT_HIGH_PERCENT       99
   108 #define DEFAULT_HIGH_PERCENT       99
    96 
   109 
    97 /* other defines */
   110 /* other defines */
    98 #define DEFAULT_BUFFER_SIZE 4096
   111 #define DEFAULT_BUFFER_SIZE 4096
    99 #define QUEUE_IS_USING_TEMP_FILE(queue) (queue->temp_location != NULL)
   112 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
   100 
   113 
   101 enum
   114 enum
   102 {
   115 {
   103   PROP_0,
   116   PROP_0,
   104   PROP_CUR_LEVEL_BUFFERS,
   117   PROP_CUR_LEVEL_BUFFERS,
   109   PROP_MAX_SIZE_TIME,
   122   PROP_MAX_SIZE_TIME,
   110   PROP_USE_BUFFERING,
   123   PROP_USE_BUFFERING,
   111   PROP_USE_RATE_ESTIMATE,
   124   PROP_USE_RATE_ESTIMATE,
   112   PROP_LOW_PERCENT,
   125   PROP_LOW_PERCENT,
   113   PROP_HIGH_PERCENT,
   126   PROP_HIGH_PERCENT,
       
   127   PROP_TEMP_TEMPLATE,
   114   PROP_TEMP_LOCATION
   128   PROP_TEMP_LOCATION
   115 };
   129 };
   116 
   130 
   117 #define GST_TYPE_QUEUE \
   131 #define GST_TYPE_QUEUE \
   118   (gst_queue_get_type())
   132   (gst_queue_get_type())
   197   GCond *item_add;              /* signals buffers now available for reading */
   211   GCond *item_add;              /* signals buffers now available for reading */
   198   gboolean waiting_del;
   212   gboolean waiting_del;
   199   GCond *item_del;              /* signals space now available for writing */
   213   GCond *item_del;              /* signals space now available for writing */
   200 
   214 
   201   /* temp location stuff */
   215   /* temp location stuff */
       
   216   gchar *temp_template;
       
   217   gboolean temp_location_set;
   202   gchar *temp_location;
   218   gchar *temp_location;
   203   FILE *temp_file;
   219   FILE *temp_file;
   204   guint64 writing_pos;
   220   guint64 writing_pos;
   205   guint64 reading_pos;
   221   guint64 reading_pos;
       
   222   guint64 max_reading_pos;
   206   /* we need this to send the first new segment event of the stream
   223   /* we need this to send the first new segment event of the stream
   207    * because we can't save it on the file */
   224    * because we can't save it on the file */
   208   gboolean segment_event_received;
   225   gboolean segment_event_received;
   209   GstEvent *starting_segment;
   226   GstEvent *starting_segment;
   210 
   227 
   226                       queue->cur_level.bytes, \
   243                       queue->cur_level.bytes, \
   227                       queue->max_level.bytes, \
   244                       queue->max_level.bytes, \
   228                       queue->cur_level.time, \
   245                       queue->cur_level.time, \
   229                       queue->max_level.time, \
   246                       queue->max_level.time, \
   230                       (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
   247                       (guint64) (QUEUE_IS_USING_TEMP_FILE(queue) ? \
   231                         queue->writing_pos - queue->reading_pos : \
   248                         queue->writing_pos - queue->max_reading_pos : \
   232                         queue->queue->length))
   249                         queue->queue->length))
   233 
   250 
   234 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
   251 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
   235   g_mutex_lock (q->qlock);                                              \
   252   g_mutex_lock (q->qlock);                                              \
   236 } G_STMT_END
   253 } G_STMT_END
   365 
   382 
   366   /* properties */
   383   /* properties */
   367   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
   384   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
   368       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
   385       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
   369           "Current amount of data in the queue (bytes)",
   386           "Current amount of data in the queue (bytes)",
   370           0, G_MAXUINT, 0, G_PARAM_READABLE));
   387           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   371   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
   388   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
   372       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
   389       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
   373           "Current number of buffers in the queue",
   390           "Current number of buffers in the queue",
   374           0, G_MAXUINT, 0, G_PARAM_READABLE));
   391           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   375   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
   392   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
   376       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
   393       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
   377           "Current amount of data in the queue (in ns)",
   394           "Current amount of data in the queue (in ns)",
   378           0, G_MAXUINT64, 0, G_PARAM_READABLE));
   395           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
   379 
   396 
   380   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
   397   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
   381       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   398       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
   382           "Max. amount of data in the queue (bytes, 0=disable)",
   399           "Max. amount of data in the queue (bytes, 0=disable)",
   383           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
   400           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
       
   401           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   384   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
   402   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
   385       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   403       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
   386           "Max. number of buffers in the queue (0=disable)",
   404           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
   387           0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
   405           DEFAULT_MAX_SIZE_BUFFERS,
       
   406           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   388   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
   407   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
   389       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   408       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
   390           "Max. amount of data in the queue (in ns, 0=disable)",
   409           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
   391           0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));
   410           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   392 
   411 
   393   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
   412   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
   394       g_param_spec_boolean ("use-buffering", "Use buffering",
   413       g_param_spec_boolean ("use-buffering", "Use buffering",
   395           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
   414           "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
   396           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE));
   415           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   397   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
   416   g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
   398       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
   417       g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
   399           "Estimate the bitrate of the stream to calculate time level",
   418           "Estimate the bitrate of the stream to calculate time level",
   400           DEFAULT_USE_RATE_ESTIMATE, G_PARAM_READWRITE));
   419           DEFAULT_USE_RATE_ESTIMATE,
       
   420           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   401   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
   421   g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
   402       g_param_spec_int ("low-percent", "Low percent",
   422       g_param_spec_int ("low-percent", "Low percent",
   403           "Low threshold for buffering to start",
   423           "Low threshold for buffering to start", 0, 100, DEFAULT_LOW_PERCENT,
   404           0, 100, DEFAULT_LOW_PERCENT, G_PARAM_READWRITE));
   424           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   405   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
   425   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
   406       g_param_spec_int ("high-percent", "High percent",
   426       g_param_spec_int ("high-percent", "High percent",
   407           "High threshold for buffering to finish",
   427           "High threshold for buffering to finish", 0, 100,
   408           0, 100, DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE));
   428           DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
       
   429 
       
   430   g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
       
   431       g_param_spec_string ("temp-template", "Temporary File Template",
       
   432           "File template to store temporary files in, should contain directory "
       
   433           "and XXXXXX. (NULL == disabled)",
       
   434           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   409 
   435 
   410   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
   436   g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
   411       g_param_spec_string ("temp-location", "Temporary File Location",
   437       g_param_spec_string ("temp-location", "Temporary File Location",
   412           "Location of a temporary file to store data in",
   438           "Location to store temporary files in (Deprecated: Only read this "
   413           NULL, G_PARAM_READWRITE));
   439           "property, use temp-tmpl to configure the name template)",
       
   440           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   414 
   441 
   415   gst_element_class_add_pad_template (gstelement_class,
   442   gst_element_class_add_pad_template (gstelement_class,
   416       gst_static_pad_template_get (&srctemplate));
   443       gst_static_pad_template_get (&srctemplate));
   417   gst_element_class_add_pad_template (gstelement_class,
   444   gst_element_class_add_pad_template (gstelement_class,
   418       gst_static_pad_template_get (&sinktemplate));
   445       gst_static_pad_template_get (&sinktemplate));
   485   queue->waiting_del = FALSE;
   512   queue->waiting_del = FALSE;
   486   queue->item_del = g_cond_new ();
   513   queue->item_del = g_cond_new ();
   487   queue->queue = g_queue_new ();
   514   queue->queue = g_queue_new ();
   488 
   515 
   489   /* tempfile related */
   516   /* tempfile related */
       
   517   queue->temp_template = NULL;
   490   queue->temp_location = NULL;
   518   queue->temp_location = NULL;
   491   queue->temp_file = NULL;
   519   queue->temp_location_set = FALSE;
   492 
   520 
   493   GST_DEBUG_OBJECT (queue,
   521   GST_DEBUG_OBJECT (queue,
   494       "initialized queue's not_empty & not_full conditions");
   522       "initialized queue's not_empty & not_full conditions");
   495 }
   523 }
   496 
   524 
   514   g_cond_free (queue->item_del);
   542   g_cond_free (queue->item_del);
   515   g_timer_destroy (queue->in_timer);
   543   g_timer_destroy (queue->in_timer);
   516   g_timer_destroy (queue->out_timer);
   544   g_timer_destroy (queue->out_timer);
   517 
   545 
   518   /* temp_file path cleanup  */
   546   /* temp_file path cleanup  */
   519   if (queue->temp_location != NULL)
   547   g_free (queue->temp_template);
   520     g_free (queue->temp_location);
   548   g_free (queue->temp_location);
   521 
   549 
   522   G_OBJECT_CLASS (parent_class)->finalize (object);
   550   G_OBJECT_CLASS (parent_class)->finalize (object);
   523 }
   551 }
   524 
   552 
   525 static GstCaps *
   553 static GstCaps *
   687       queue->buffering_iteration++;
   715       queue->buffering_iteration++;
   688       post = TRUE;
   716       post = TRUE;
   689     }
   717     }
   690   }
   718   }
   691   if (post) {
   719   if (post) {
       
   720     GstMessage *message;
       
   721     GstBufferingMode mode;
       
   722 
   692     /* scale to high percent so that it becomes the 100% mark */
   723     /* scale to high percent so that it becomes the 100% mark */
   693     percent = percent * 100 / queue->high_percent;
   724     percent = percent * 100 / queue->high_percent;
   694     /* clip */
   725     /* clip */
   695     if (percent > 100)
   726     if (percent > 100)
   696       percent = 100;
   727       percent = 100;
   697 
   728 
       
   729     if (QUEUE_IS_USING_TEMP_FILE (queue))
       
   730       mode = GST_BUFFERING_DOWNLOAD;
       
   731     else
       
   732       mode = GST_BUFFERING_STREAM;
       
   733 
   698     GST_DEBUG_OBJECT (queue, "buffering %d percent", percent);
   734     GST_DEBUG_OBJECT (queue, "buffering %d percent", percent);
   699     gst_element_post_message (GST_ELEMENT_CAST (queue),
   735     message = gst_message_new_buffering (GST_OBJECT_CAST (queue), percent);
   700         gst_message_new_buffering (GST_OBJECT_CAST (queue), percent));
   736     gst_message_set_buffering_stats (message, mode,
       
   737         queue->byte_in_rate, queue->byte_out_rate, -1);
       
   738 
       
   739     gst_element_post_message (GST_ELEMENT_CAST (queue), message);
       
   740 
   701   } else {
   741   } else {
   702     GST_DEBUG_OBJECT (queue, "filled %d percent", percent);
   742     GST_DEBUG_OBJECT (queue, "filled %d percent", percent);
   703   }
   743   }
   704 
   744 
   705 #undef GET_PERCENT
   745 #undef GET_PERCENT
   798 
   838 
   799     /* reset the values to calculate rate over the next interval */
   839     /* reset the values to calculate rate over the next interval */
   800     queue->last_out_elapsed = elapsed;
   840     queue->last_out_elapsed = elapsed;
   801     queue->bytes_out = 0;
   841     queue->bytes_out = 0;
   802   }
   842   }
       
   843   if (queue->byte_in_rate > 0.0) {
       
   844     queue->cur_level.rate_time =
       
   845         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
       
   846   }
   803   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
   847   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
   804       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
   848       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
   805 }
   849 }
   806 
   850 
   807 static void
   851 static void
   809 {
   853 {
   810   guint size;
   854   guint size;
   811   guint8 *data;
   855   guint8 *data;
   812   int ret;
   856   int ret;
   813 
   857 
       
   858 #ifdef HAVE_FSEEKO
       
   859   fseeko (queue->temp_file, (off_t) queue->writing_pos, SEEK_SET);
       
   860 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
       
   861   lseek (fileno (queue->temp_file), (off_t) queue->writing_pos, SEEK_SET);
       
   862 #else
   814   fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
   863   fseek (queue->temp_file, queue->writing_pos, SEEK_SET);
       
   864 #endif
   815 
   865 
   816   data = GST_BUFFER_DATA (buffer);
   866   data = GST_BUFFER_DATA (buffer);
   817   size = GST_BUFFER_SIZE (buffer);
   867   size = GST_BUFFER_SIZE (buffer);
   818 
   868 
   819   ret = fwrite (data, 1, size, queue->temp_file);
   869   ret = fwrite (data, 1, size, queue->temp_file);
   820   if (ret < size) {
   870   if (ret < size) {
   821     /* FIXME do something useful here */
   871     /* FIXME do something useful here */
   822     GST_ERROR_OBJECT (queue, "fwrite returned error");
   872     GST_ERROR_OBJECT (queue, "fwrite returned error");
   823   }
   873   }
   824   queue->writing_pos += size;
   874   queue->writing_pos += size;
       
   875 
       
   876   if (queue->writing_pos > queue->max_reading_pos)
       
   877     queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
       
   878   else
       
   879     queue->cur_level.bytes = 0;
   825 }
   880 }
   826 
   881 
   827 /* see if there is enough data in the file to read a full buffer */
   882 /* see if there is enough data in the file to read a full buffer */
   828 static gboolean
   883 static gboolean
   829 gst_queue_have_data (GstQueue * queue, guint64 offset, guint length)
   884 gst_queue_have_data (GstQueue * queue, guint64 offset, guint length)
   854   }
   909   }
   855 
   910 
   856 #ifdef HAVE_FSEEKO
   911 #ifdef HAVE_FSEEKO
   857   if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
   912   if (fseeko (queue->temp_file, (off_t) offset, SEEK_SET) != 0)
   858     goto seek_failed;
   913     goto seek_failed;
   859 #elif defined (G_OS_UNIX)
   914 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
   860   if (lseek (fileno (queue->temp_file), (off_t) offset,
   915   if (lseek (fileno (queue->temp_file), (off_t) offset,
   861           SEEK_SET) == (off_t) - 1)
   916           SEEK_SET) == (off_t) - 1)
   862     goto seek_failed;
   917     goto seek_failed;
   863 #else
   918 #else
   864   if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0)
   919   if (fseek (queue->temp_file, (long) offset, SEEK_SET) != 0)
   887   GST_BUFFER_OFFSET_END (buf) = offset + length;
   942   GST_BUFFER_OFFSET_END (buf) = offset + length;
   888 
   943 
   889   *buffer = buf;
   944   *buffer = buf;
   890 
   945 
   891   queue->reading_pos = offset + length;
   946   queue->reading_pos = offset + length;
       
   947   queue->max_reading_pos = MAX (queue->max_reading_pos, queue->reading_pos);
       
   948 
       
   949   if (queue->writing_pos > queue->max_reading_pos)
       
   950     queue->cur_level.bytes = queue->writing_pos - queue->max_reading_pos;
       
   951   else
       
   952     queue->cur_level.bytes = 0;
   892 
   953 
   893   return GST_FLOW_OK;
   954   return GST_FLOW_OK;
   894 
   955 
   895   /* ERRORS */
   956   /* ERRORS */
   896 out_flushing:
   957 out_flushing:
   949 }
  1010 }
   950 
  1011 
   951 static gboolean
  1012 static gboolean
   952 gst_queue_open_temp_location_file (GstQueue * queue)
  1013 gst_queue_open_temp_location_file (GstQueue * queue)
   953 {
  1014 {
   954   /* nothing to do */
  1015   gint fd = -1;
   955   if (queue->temp_location == NULL)
  1016   gchar *name = NULL;
   956     goto no_filename;
  1017 
   957 
  1018   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
   958   /* open the file for update/writing */
  1019 
   959   queue->temp_file = g_fopen (queue->temp_location, "wb+");
  1020   /* we have two cases:
   960   /* error creating file */
  1021    * - temp_location was set to something !NULL (Deprecated). in this case we
   961   if (queue->temp_file == NULL)
  1022    *   open the specified filename.
   962     goto open_failed;
  1023    * - temp_template was set, allocate a filename and open that filename
       
  1024    */
       
  1025   if (!queue->temp_location_set) {
       
  1026     /* nothing to do */
       
  1027     if (queue->temp_template == NULL)
       
  1028       goto no_directory;
       
  1029 
       
  1030     /* make copy of the template, we don't want to change this */
       
  1031     name = g_strdup (queue->temp_template);
       
  1032     fd = g_mkstemp (name);
       
  1033     if (fd == -1)
       
  1034       goto mkstemp_failed;
       
  1035 
       
  1036     /* open the file for update/writing */
       
  1037     queue->temp_file = fdopen (fd, "wb+");
       
  1038     /* error creating file */
       
  1039     if (queue->temp_file == NULL)
       
  1040       goto open_failed;
       
  1041 
       
  1042     g_free (queue->temp_location);
       
  1043     queue->temp_location = name;
       
  1044 
       
  1045     g_object_notify (G_OBJECT (queue), "temp-location");
       
  1046   } else {
       
  1047     /* open the file for update/writing, this is deprecated but we still need to
       
  1048      * support it for API/ABI compatibility */
       
  1049     queue->temp_file = g_fopen (queue->temp_location, "wb+");
       
  1050     /* error creating file */
       
  1051     if (queue->temp_file == NULL)
       
  1052       goto open_failed;
       
  1053   }
   963 
  1054 
   964   queue->writing_pos = 0;
  1055   queue->writing_pos = 0;
   965   queue->reading_pos = 0;
  1056   queue->reading_pos = 0;
       
  1057   queue->max_reading_pos = 0;
   966 
  1058 
   967   return TRUE;
  1059   return TRUE;
   968 
  1060 
   969   /* ERRORS */
  1061   /* ERRORS */
   970 no_filename:
  1062 no_directory:
   971   {
  1063   {
   972     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
  1064     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
   973         (_("No file name specified.")), (NULL));
  1065         (_("No Temp directory specified.")), (NULL));
       
  1066     return FALSE;
       
  1067   }
       
  1068 mkstemp_failed:
       
  1069   {
       
  1070     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
       
  1071         (_("Could not create temp file \"%s\"."), queue->temp_template),
       
  1072         GST_ERROR_SYSTEM);
       
  1073     g_free (name);
   974     return FALSE;
  1074     return FALSE;
   975   }
  1075   }
   976 open_failed:
  1076 open_failed:
   977   {
  1077   {
   978     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
  1078     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
   979         (_("Could not open file \"%s\" for reading."), queue->temp_location),
  1079         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
   980         GST_ERROR_SYSTEM);
  1080     g_free (name);
       
  1081     if (fd != -1)
       
  1082       close (fd);
   981     return FALSE;
  1083     return FALSE;
   982   }
  1084   }
   983 }
  1085 }
   984 
  1086 
   985 static void
  1087 static void
   986 gst_queue_close_temp_location_file (GstQueue * queue)
  1088 gst_queue_close_temp_location_file (GstQueue * queue)
   987 {
  1089 {
   988   /* nothing to do */
  1090   /* nothing to do */
   989   if (queue->temp_file == NULL)
  1091   if (queue->temp_file == NULL)
   990     return;
  1092     return;
       
  1093 
       
  1094   GST_DEBUG_OBJECT (queue, "closing temp file");
   991 
  1095 
   992   /* we don't remove the file so that the application can use it as a cache
  1096   /* we don't remove the file so that the application can use it as a cache
   993    * later on */
  1097    * later on */
   994   fflush (queue->temp_file);
  1098   fflush (queue->temp_file);
   995   fclose (queue->temp_file);
  1099   fclose (queue->temp_file);
   996   remove (queue->temp_location);
  1100   remove (queue->temp_location);
   997   queue->temp_file = NULL;
  1101   queue->temp_file = NULL;
   998 }
  1102 }
   999 
  1103 
  1000 static void
  1104 static void
       
  1105 gst_queue_flush_temp_file (GstQueue * queue)
       
  1106 {
       
  1107   if (queue->temp_file == NULL)
       
  1108     return;
       
  1109 
       
  1110   GST_DEBUG_OBJECT (queue, "flushing temp file");
       
  1111 
       
  1112   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
       
  1113 
       
  1114   queue->writing_pos = 0;
       
  1115   queue->reading_pos = 0;
       
  1116   queue->max_reading_pos = 0;
       
  1117 }
       
  1118 
       
  1119 static void
  1001 gst_queue_locked_flush (GstQueue * queue)
  1120 gst_queue_locked_flush (GstQueue * queue)
  1002 {
  1121 {
  1003   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
  1122   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
  1004     gst_queue_close_temp_location_file (queue);
  1123     gst_queue_flush_temp_file (queue);
  1005     gst_queue_open_temp_location_file (queue);
       
  1006   } else {
  1124   } else {
  1007     while (!g_queue_is_empty (queue->queue)) {
  1125     while (!g_queue_is_empty (queue->queue)) {
  1008       GstMiniObject *data = g_queue_pop_head (queue->queue);
  1126       GstMiniObject *data = g_queue_pop_head (queue->queue);
  1009 
  1127 
  1010       /* Then lose another reference because we are supposed to destroy that
  1128       /* Then lose another reference because we are supposed to destroy that
  1037 
  1155 
  1038     /* add buffer to the statistics */
  1156     /* add buffer to the statistics */
  1039     queue->cur_level.buffers++;
  1157     queue->cur_level.buffers++;
  1040     queue->cur_level.bytes += size;
  1158     queue->cur_level.bytes += size;
  1041     queue->bytes_in += size;
  1159     queue->bytes_in += size;
       
  1160 
  1042     /* apply new buffer to segment stats */
  1161     /* apply new buffer to segment stats */
  1043     apply_buffer (queue, buffer, &queue->sink_segment);
  1162     apply_buffer (queue, buffer, &queue->sink_segment);
  1044     /* update the byterate stats */
  1163     /* update the byterate stats */
  1045     update_in_rates (queue);
  1164     update_in_rates (queue);
  1046 
  1165 
  1066         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
  1185         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
  1067           if (queue->segment_event_received)
  1186           if (queue->segment_event_received)
  1068             goto unexpected_event;
  1187             goto unexpected_event;
  1069 
  1188 
  1070           queue->segment_event_received = TRUE;
  1189           queue->segment_event_received = TRUE;
       
  1190           if (queue->starting_segment != NULL)
       
  1191             gst_event_unref (queue->starting_segment);
  1071           queue->starting_segment = event;
  1192           queue->starting_segment = event;
       
  1193           item = NULL;
  1072         }
  1194         }
  1073         /* a new segment allows us to accept more buffers if we got UNEXPECTED
  1195         /* a new segment allows us to accept more buffers if we got UNEXPECTED
  1074          * from downstream */
  1196          * from downstream */
  1075         queue->unexpected = FALSE;
  1197         queue->unexpected = FALSE;
  1076         break;
  1198         break;
  1090     /* update the buffering status */
  1212     /* update the buffering status */
  1091     update_buffering (queue);
  1213     update_buffering (queue);
  1092 
  1214 
  1093     if (!QUEUE_IS_USING_TEMP_FILE (queue))
  1215     if (!QUEUE_IS_USING_TEMP_FILE (queue))
  1094       g_queue_push_tail (queue->queue, item);
  1216       g_queue_push_tail (queue->queue, item);
       
  1217     else
       
  1218       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
       
  1219 
  1095     GST_QUEUE_SIGNAL_ADD (queue);
  1220     GST_QUEUE_SIGNAL_ADD (queue);
  1096   }
  1221   }
  1097 
  1222 
  1098   return;
  1223   return;
  1099 
  1224 
  1264   /* never empty on EOS */
  1389   /* never empty on EOS */
  1265   if (queue->is_eos)
  1390   if (queue->is_eos)
  1266     return FALSE;
  1391     return FALSE;
  1267 
  1392 
  1268   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
  1393   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
  1269     return queue->writing_pos == queue->reading_pos;
  1394     return queue->writing_pos == queue->max_reading_pos;
  1270   } else {
  1395   } else {
  1271     if (queue->queue->length == 0)
  1396     if (queue->queue->length == 0)
  1272       return TRUE;
  1397       return TRUE;
  1273   }
  1398   }
  1274 
  1399 
  1284   if (queue->is_eos)
  1409   if (queue->is_eos)
  1285     return TRUE;
  1410     return TRUE;
  1286 
  1411 
  1287   /* if using file, we're never filled if we don't have EOS */
  1412   /* if using file, we're never filled if we don't have EOS */
  1288   if (QUEUE_IS_USING_TEMP_FILE (queue))
  1413   if (QUEUE_IS_USING_TEMP_FILE (queue))
       
  1414     return FALSE;
       
  1415 
       
  1416   /* we are never filled when we have no buffers at all */
       
  1417   if (queue->cur_level.buffers == 0)
  1289     return FALSE;
  1418     return FALSE;
  1290 
  1419 
  1291 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
  1420 #define CHECK_FILLED(format) ((queue->max_level.format) > 0 && \
  1292 		(queue->cur_level.format) >= (queue->max_level.format))
  1421 		(queue->cur_level.format) >= (queue->max_level.format))
  1293 
  1422 
  1546 #ifndef GST_DISABLE_GST_DEBUG
  1675 #ifndef GST_DISABLE_GST_DEBUG
  1547   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
  1676   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
  1548       event, GST_EVENT_TYPE_NAME (event));
  1677       event, GST_EVENT_TYPE_NAME (event));
  1549 #endif
  1678 #endif
  1550 
  1679 
  1551   /* just forward upstream */
  1680   if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
  1552   res = gst_pad_push_event (queue->sinkpad, event);
  1681     /* just forward upstream */
       
  1682     res = gst_pad_push_event (queue->sinkpad, event);
       
  1683   } else {
       
  1684     /* when using a temp file, we unblock the pending read */
       
  1685     res = TRUE;
       
  1686     gst_event_unref (event);
       
  1687   }
  1553 
  1688 
  1554   return res;
  1689   return res;
  1555 }
  1690 }
  1556 
  1691 
  1557 static gboolean
  1692 static gboolean
  1603       gst_query_set_position (query, format, peer_pos);
  1738       gst_query_set_position (query, format, peer_pos);
  1604       break;
  1739       break;
  1605     }
  1740     }
  1606     case GST_QUERY_DURATION:
  1741     case GST_QUERY_DURATION:
  1607     {
  1742     {
  1608       GST_DEBUG_OBJECT (queue, "waiting for preroll in duration query");
  1743       GST_DEBUG_OBJECT (queue, "doing peer query");
  1609 
       
  1610       GST_QUEUE_MUTEX_LOCK (queue);
       
  1611       /* we have to wait until the upstream element is at least paused, which
       
  1612        * happened when we received a first item. */
       
  1613       while (gst_queue_is_empty (queue)) {
       
  1614         GST_QUEUE_WAIT_ADD_CHECK (queue, flushing);
       
  1615       }
       
  1616       GST_QUEUE_MUTEX_UNLOCK (queue);
       
  1617 
  1744 
  1618       if (!gst_queue_peer_query (queue, queue->sinkpad, query))
  1745       if (!gst_queue_peer_query (queue, queue->sinkpad, query))
  1619         goto peer_failed;
  1746         goto peer_failed;
  1620 
  1747 
  1621       GST_DEBUG_OBJECT (queue, "peer query success");
  1748       GST_DEBUG_OBJECT (queue, "peer query success");
       
  1749       break;
       
  1750     }
       
  1751     case GST_QUERY_BUFFERING:
       
  1752     {
       
  1753       GstFormat format;
       
  1754 
       
  1755       GST_DEBUG_OBJECT (queue, "query buffering");
       
  1756 
       
  1757       if (!QUEUE_IS_USING_TEMP_FILE (queue)) {
       
  1758         /* no temp file, just forward to the peer */
       
  1759         if (!gst_queue_peer_query (queue, queue->sinkpad, query))
       
  1760           goto peer_failed;
       
  1761         GST_DEBUG_OBJECT (queue, "buffering forwarded to peer");
       
  1762       } else {
       
  1763         gint64 start, stop;
       
  1764 
       
  1765         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
       
  1766 
       
  1767         switch (format) {
       
  1768           case GST_FORMAT_PERCENT:
       
  1769           {
       
  1770             gint64 duration;
       
  1771             GstFormat peer_fmt;
       
  1772 
       
  1773             peer_fmt = GST_FORMAT_BYTES;
       
  1774 
       
  1775             if (!gst_pad_query_peer_duration (queue->sinkpad, &peer_fmt,
       
  1776                     &duration))
       
  1777               goto peer_failed;
       
  1778 
       
  1779             GST_DEBUG_OBJECT (queue, "duration %" G_GINT64_FORMAT ", writing %"
       
  1780                 G_GINT64_FORMAT, duration, queue->writing_pos);
       
  1781 
       
  1782             start = 0;
       
  1783             /* get our available data relative to the duration */
       
  1784             if (duration != -1)
       
  1785               stop = GST_FORMAT_PERCENT_MAX * queue->writing_pos / duration;
       
  1786             else
       
  1787               stop = -1;
       
  1788             break;
       
  1789           }
       
  1790           case GST_FORMAT_BYTES:
       
  1791             start = 0;
       
  1792             stop = queue->writing_pos;
       
  1793             break;
       
  1794           default:
       
  1795             start = -1;
       
  1796             stop = -1;
       
  1797             break;
       
  1798         }
       
  1799         gst_query_set_buffering_percent (query, queue->is_buffering, 100);
       
  1800         gst_query_set_buffering_range (query, format, start, stop, -1);
       
  1801       }
  1622       break;
  1802       break;
  1623     }
  1803     }
  1624     default:
  1804     default:
  1625       /* peer handled other queries */
  1805       /* peer handled other queries */
  1626       if (!gst_queue_peer_query (queue, queue->sinkpad, query))
  1806       if (!gst_queue_peer_query (queue, queue->sinkpad, query))
  1632 
  1812 
  1633   /* ERRORS */
  1813   /* ERRORS */
  1634 peer_failed:
  1814 peer_failed:
  1635   {
  1815   {
  1636     GST_DEBUG_OBJECT (queue, "failed peer query");
  1816     GST_DEBUG_OBJECT (queue, "failed peer query");
  1637     return FALSE;
       
  1638   }
       
  1639 flushing:
       
  1640   {
       
  1641     GST_DEBUG_OBJECT (queue, "flushing while waiting for query");
       
  1642     GST_QUEUE_MUTEX_UNLOCK (queue);
       
  1643     return FALSE;
  1817     return FALSE;
  1644   }
  1818   }
  1645 }
  1819 }
  1646 
  1820 
  1647 static GstFlowReturn
  1821 static GstFlowReturn
  1679 {
  1853 {
  1680   GstQueue *queue;
  1854   GstQueue *queue;
  1681   gboolean ret;
  1855   gboolean ret;
  1682 
  1856 
  1683   queue = GST_QUEUE (gst_pad_get_parent (pad));
  1857   queue = GST_QUEUE (gst_pad_get_parent (pad));
       
  1858 
  1684   /* we can operate in pull mode when we are using a tempfile */
  1859   /* we can operate in pull mode when we are using a tempfile */
  1685   ret = QUEUE_IS_USING_TEMP_FILE (queue);
  1860   ret = QUEUE_IS_USING_TEMP_FILE (queue);
       
  1861 
  1686   gst_object_unref (GST_OBJECT (queue));
  1862   gst_object_unref (GST_OBJECT (queue));
  1687 
  1863 
  1688   return ret;
  1864   return ret;
  1689 }
  1865 }
  1690 
  1866 
  1854  * be able to preceed.
  2030  * be able to preceed.
  1855  */
  2031  */
  1856 #define QUEUE_THRESHOLD_CHANGE(q)\
  2032 #define QUEUE_THRESHOLD_CHANGE(q)\
  1857   g_cond_signal (queue->item_add);
  2033   g_cond_signal (queue->item_add);
  1858 
  2034 
  1859 static gboolean
  2035 static void
  1860 gst_queue_set_temp_location (GstQueue * queue, const gchar * location)
  2036 gst_queue_set_temp_template (GstQueue * queue, const gchar * template)
  1861 {
  2037 {
  1862   GstState state;
  2038   GstState state;
  1863 
  2039 
  1864   /* the element must be stopped in order to do this */
  2040   /* the element must be stopped in order to do this */
  1865   GST_OBJECT_LOCK (queue);
  2041   GST_OBJECT_LOCK (queue);
  1867   if (state != GST_STATE_READY && state != GST_STATE_NULL)
  2043   if (state != GST_STATE_READY && state != GST_STATE_NULL)
  1868     goto wrong_state;
  2044     goto wrong_state;
  1869   GST_OBJECT_UNLOCK (queue);
  2045   GST_OBJECT_UNLOCK (queue);
  1870 
  2046 
  1871   /* set new location */
  2047   /* set new location */
  1872   g_free (queue->temp_location);
  2048   g_free (queue->temp_template);
  1873   queue->temp_location = g_strdup (location);
  2049   queue->temp_template = g_strdup (template);
  1874 
  2050 
  1875   g_object_notify (G_OBJECT (queue), "temp-location");
  2051   return;
  1876 
       
  1877   return TRUE;
       
  1878 
  2052 
  1879 /* ERROR */
  2053 /* ERROR */
  1880 wrong_state:
  2054 wrong_state:
  1881   {
  2055   {
  1882     GST_DEBUG_OBJECT (queue, "setting temp-location in wrong state");
  2056     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
  1883     GST_OBJECT_UNLOCK (queue);
  2057     GST_OBJECT_UNLOCK (queue);
  1884     return FALSE;
       
  1885   }
  2058   }
  1886 }
  2059 }
  1887 
  2060 
  1888 static void
  2061 static void
  1889 gst_queue_set_property (GObject * object,
  2062 gst_queue_set_property (GObject * object,
  1921       queue->low_percent = g_value_get_int (value);
  2094       queue->low_percent = g_value_get_int (value);
  1922       break;
  2095       break;
  1923     case PROP_HIGH_PERCENT:
  2096     case PROP_HIGH_PERCENT:
  1924       queue->high_percent = g_value_get_int (value);
  2097       queue->high_percent = g_value_get_int (value);
  1925       break;
  2098       break;
       
  2099     case PROP_TEMP_TEMPLATE:
       
  2100       gst_queue_set_temp_template (queue, g_value_get_string (value));
       
  2101       break;
  1926     case PROP_TEMP_LOCATION:
  2102     case PROP_TEMP_LOCATION:
  1927       gst_queue_set_temp_location (queue, g_value_dup_string (value));
  2103       g_free (queue->temp_location);
       
  2104       queue->temp_location = g_value_dup_string (value);
       
  2105       /* you can set the property back to NULL to make it use the temp-tmpl
       
  2106        * property. */
       
  2107       queue->temp_location_set = queue->temp_location != NULL;
  1928       break;
  2108       break;
  1929     default:
  2109     default:
  1930       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
  2110       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
  1931       break;
  2111       break;
  1932   }
  2112   }
  1970     case PROP_LOW_PERCENT:
  2150     case PROP_LOW_PERCENT:
  1971       g_value_set_int (value, queue->low_percent);
  2151       g_value_set_int (value, queue->low_percent);
  1972       break;
  2152       break;
  1973     case PROP_HIGH_PERCENT:
  2153     case PROP_HIGH_PERCENT:
  1974       g_value_set_int (value, queue->high_percent);
  2154       g_value_set_int (value, queue->high_percent);
       
  2155       break;
       
  2156     case PROP_TEMP_TEMPLATE:
       
  2157       g_value_set_string (value, queue->temp_template);
  1975       break;
  2158       break;
  1976     case PROP_TEMP_LOCATION:
  2159     case PROP_TEMP_LOCATION:
  1977       g_value_set_string (value, queue->temp_location);
  2160       g_value_set_string (value, queue->temp_location);
  1978       break;
  2161       break;
  1979     default:
  2162     default:
  1993 
  2176 
  1994 #ifdef ENABLE_NLS
  2177 #ifdef ENABLE_NLS
  1995   GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE,
  2178   GST_DEBUG ("binding text domain %s to locale dir %s", GETTEXT_PACKAGE,
  1996       LOCALEDIR);
  2179       LOCALEDIR);
  1997   bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR);
  2180   bindtextdomain (GETTEXT_PACKAGE, LOCALEDIR);
       
  2181   bind_textdomain_codeset (GETTEXT_PACKAGE, "UTF-8");
  1998 #endif /* ENABLE_NLS */
  2182 #endif /* ENABLE_NLS */
  1999 
  2183 
  2000   return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE);
  2184   return gst_element_register (plugin, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE);
  2001 }
  2185 }
  2002 
  2186