gst_plugins_base/gst-libs/gst/app/gstappsrc.c
changeset 0 0e761a78d257
child 7 567bb019e3e3
equal deleted inserted replaced
-1:000000000000 0:0e761a78d257
       
     1 /* GStreamer
       
     2  * Copyright (C) 2007 David Schleef <ds@schleef.org>
       
     3  *           (C) 2008 Wim Taymans <wim.taymans@gmail.com>
       
     4  *
       
     5  * This library is free software; you can redistribute it and/or
       
     6  * modify it under the terms of the GNU Library General Public
       
     7  * License as published by the Free Software Foundation; either
       
     8  * version 2 of the License, or (at your option) any later version.
       
     9  *
       
    10  * This library is distributed in the hope that it will be useful,
       
    11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    13  * Library General Public License for more details.
       
    14  *
       
    15  * You should have received a copy of the GNU Library General Public
       
    16  * License along with this library; if not, write to the
       
    17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       
    18  * Boston, MA 02111-1307, USA.
       
    19  */
       
    20 
       
    21 /**
       
    22  * SECTION:element-appsrc
       
    23  *
       
    24  * The appsrc element can be used by applications to insert data into a
       
    25  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
       
    26  * external API functions.
       
    27  *
       
    28  * For the documentation of the API, please see the
       
    29  * <link linkend="gst-plugins-base-libs-appsrc">libgstapp</link> section in the
       
    30  * GStreamer Plugins Base Libraries documentation.
       
    31  * 
       
    32  * Since: 0.10.22
       
    33  */
       
    34 
       
    35 /**
       
    36  * SECTION:gstappsrc
       
    37  * @see_also: #GstBaseSrc, appsink
       
    38  *
       
    39  * The appsrc element can be used by applications to insert data into a
       
    40  * GStreamer pipeline. Unlike most GStreamer elements, Appsrc provides
       
    41  * external API functions.
       
    42  *
       
    43  * appsrc can be used by linking to the gstappsrc.h header file to access the
       
    44  * methods or by using the appsrc action signals. For the API
       
    45  * documentation, see the documentation for libgstapp in the
       
    46  * GStreamer Base Plugins Library reference...
       
    47  *
       
    48  * Before operating appsrc, the caps property must be set to a fixed caps
       
    49  * describing the format of the data that will be pushed with appsrc.
       
    50  *
       
    51  * The main way of handing data to the appsrc element is by calling the
       
    52  * gst_app_src_push_buffer() method or by emiting the push-buffer action signal.
       
    53  * This will put the buffer onto a queue from which appsrc will read from in its
       
    54  * streaming thread. It is important to note that data transport will not happen
       
    55  * from the thread that performed the push-buffer call.
       
    56  *
       
    57  * The "max-bytes" property controls how much data can be queued in appsrc
       
    58  * before appsrc considers the queue full. A filled internal queue will always
       
    59  * signal the "enough-data" signal, which signals the application that it should
       
    60  * stop pushing data into appsrc. The "block" property will cause appsrc to
       
    61  * block the push-buffer method until free data becomes available again.
       
    62  *
       
    63  * When the internal queue is running out of data, the "need-data" signal is
       
    64  * emited, which signals the application that it should start pushing more data
       
    65  * into appsrc.
       
    66  *
       
    67  * In addition to the "need-data" and "enough-data" signals, appsrc can emit the
       
    68  * "seek-data" signal when the "stream-mode" property is set to "seekable" or
       
    69  * "random-access". The signal argument will contain the new desired position in
       
    70  * the stream expressed in the unit set with the "format" property. After
       
    71  * receiving the seek-data signal, the application should push-buffers from the
       
    72  * new position.
       
    73  *
       
    74  * These signals allow the application to operate the appsrc in two different
       
    75  * ways:
       
    76  *
       
    77  * The push model, in which the application repeadedly calls the push-buffer method
       
    78  * with a new buffer. Optionally, the queue size in the appsrc can be controlled
       
    79  * with the enough-data and need-data signals by respectively stopping/starting
       
    80  * the push-buffer calls. This is a typical mode of operation for the
       
    81  * stream-type "stream" and "seekable". Use this model when implementing various
       
    82  * network protocols or hardware devices.
       
    83  *
       
    84  * The pull model where the need-data signal triggers the next push-buffer call.
       
    85  * This mode is typically used in the "random-access" stream-type. Use this
       
    86  * model for file access or other randomly accessable sources. In this mode, a
       
    87  * buffer of exactly the amount of bytes given by the need-data signal should be
       
    88  * pushed into appsrc.
       
    89  *
       
    90  * In all modes, the size property on appsrc should contain the total stream
       
    91  * size in bytes. Setting this property is mandatory in the random-access mode.
       
    92  * For the stream and seekable modes, setting this property is optional but
       
    93  * recommended.
       
    94  *
       
    95  * When the application is finished pushing data into appsrc, it should call 
       
    96  * gst_app_src_end_of_stream() or emit the end-of-stream action signal. After
       
    97  * this call, no more buffers can be pushed into appsrc until a flushing seek
       
    98  * happened or the state of the appsrc has gone through READY.
       
    99  *
       
   100  * Last reviewed on 2008-12-17 (0.10.10)
       
   101  *
       
   102  * Since: 0.10.22
       
   103  */
       
   104 
       
   105 #ifdef HAVE_CONFIG_H
       
   106 #include "config.h"
       
   107 #endif
       
   108 
       
   109 #include <gst/gst.h>
       
   110 #include <gst/base/gstbasesrc.h>
       
   111 
       
   112 #include <string.h>
       
   113 
       
   114 #include "gstapp-marshal.h"
       
   115 #include "gstappsrc.h"
       
   116 #ifdef __SYMBIAN32__
       
   117 #include <glib_global.h>
       
   118 #endif
       
   119 
       
   120 struct _GstAppSrcPrivate
       
   121 {
       
   122   GCond *cond;
       
   123   GMutex *mutex;
       
   124   GQueue *queue;
       
   125 
       
   126   GstCaps *caps;
       
   127   gint64 size;
       
   128   GstAppStreamType stream_type;
       
   129   guint64 max_bytes;
       
   130   GstFormat format;
       
   131   gboolean block;
       
   132 
       
   133   gboolean flushing;
       
   134   gboolean started;
       
   135   gboolean is_eos;
       
   136   guint64 queued_bytes;
       
   137   guint64 offset;
       
   138   GstAppStreamType current_type;
       
   139 
       
   140   guint64 min_latency;
       
   141   guint64 max_latency;
       
   142   gboolean emit_signals;
       
   143 
       
   144   GstAppSrcCallbacks callbacks;
       
   145   gpointer user_data;
       
   146   GDestroyNotify notify;
       
   147 };
       
   148 
       
   149 GST_DEBUG_CATEGORY_STATIC (app_src_debug);
       
   150 #define GST_CAT_DEFAULT app_src_debug
       
   151 
       
   152 enum
       
   153 {
       
   154   /* signals */
       
   155   SIGNAL_NEED_DATA,
       
   156   SIGNAL_ENOUGH_DATA,
       
   157   SIGNAL_SEEK_DATA,
       
   158 
       
   159   /* actions */
       
   160   SIGNAL_PUSH_BUFFER,
       
   161   SIGNAL_END_OF_STREAM,
       
   162 
       
   163   LAST_SIGNAL
       
   164 };
       
   165 
       
   166 #define DEFAULT_PROP_SIZE          -1
       
   167 #define DEFAULT_PROP_STREAM_TYPE   GST_APP_STREAM_TYPE_STREAM
       
   168 #define DEFAULT_PROP_MAX_BYTES     200000
       
   169 #define DEFAULT_PROP_FORMAT        GST_FORMAT_BYTES
       
   170 #define DEFAULT_PROP_BLOCK         FALSE
       
   171 #define DEFAULT_PROP_IS_LIVE       FALSE
       
   172 #define DEFAULT_PROP_MIN_LATENCY   -1
       
   173 #define DEFAULT_PROP_MAX_LATENCY   -1
       
   174 #define DEFAULT_PROP_EMIT_SIGNALS  TRUE
       
   175 
       
   176 enum
       
   177 {
       
   178   PROP_0,
       
   179   PROP_CAPS,
       
   180   PROP_SIZE,
       
   181   PROP_STREAM_TYPE,
       
   182   PROP_MAX_BYTES,
       
   183   PROP_FORMAT,
       
   184   PROP_BLOCK,
       
   185   PROP_IS_LIVE,
       
   186   PROP_MIN_LATENCY,
       
   187   PROP_MAX_LATENCY,
       
   188   PROP_EMIT_SIGNALS,
       
   189   PROP_LAST
       
   190 };
       
   191 
       
   192 static GstStaticPadTemplate gst_app_src_template =
       
   193 GST_STATIC_PAD_TEMPLATE ("src",
       
   194     GST_PAD_SRC,
       
   195     GST_PAD_ALWAYS,
       
   196     GST_STATIC_CAPS_ANY);
       
   197 
       
   198 
       
   199 #define GST_TYPE_APP_STREAM_TYPE (stream_type_get_type ())
       
   200 static GType
       
   201 stream_type_get_type (void)
       
   202 {
       
   203   static GType stream_type_type = 0;
       
   204   static const GEnumValue stream_type[] = {
       
   205     {GST_APP_STREAM_TYPE_STREAM, "Stream", "stream"},
       
   206     {GST_APP_STREAM_TYPE_SEEKABLE, "Seekable", "seekable"},
       
   207     {GST_APP_STREAM_TYPE_RANDOM_ACCESS, "Random Access", "random-access"},
       
   208     {0, NULL, NULL},
       
   209   };
       
   210 
       
   211   if (!stream_type_type) {
       
   212     stream_type_type = g_enum_register_static ("GstAppStreamType", stream_type);
       
   213   }
       
   214   return stream_type_type;
       
   215 }
       
   216 
       
   217 static void gst_app_src_uri_handler_init (gpointer g_iface,
       
   218     gpointer iface_data);
       
   219 
       
   220 static void gst_app_src_dispose (GObject * object);
       
   221 static void gst_app_src_finalize (GObject * object);
       
   222 
       
   223 #ifdef __SYMBIAN32__
       
   224 static void gst_app_src_base_init (gpointer g_class);
       
   225 static void gst_app_src_class_init (GstAppSrcClass * klass);
       
   226 static void gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass);
       
   227 #endif
       
   228 
       
   229 static void gst_app_src_set_property (GObject * object, guint prop_id,
       
   230     const GValue * value, GParamSpec * pspec);
       
   231 static void gst_app_src_get_property (GObject * object, guint prop_id,
       
   232     GValue * value, GParamSpec * pspec);
       
   233 
       
   234 static void gst_app_src_set_latencies (GstAppSrc * appsrc,
       
   235     gboolean do_min, guint64 min, gboolean do_max, guint64 max);
       
   236 
       
   237 static GstFlowReturn gst_app_src_create (GstBaseSrc * bsrc,
       
   238     guint64 offset, guint size, GstBuffer ** buf);
       
   239 static gboolean gst_app_src_start (GstBaseSrc * bsrc);
       
   240 static gboolean gst_app_src_stop (GstBaseSrc * bsrc);
       
   241 static gboolean gst_app_src_unlock (GstBaseSrc * bsrc);
       
   242 static gboolean gst_app_src_unlock_stop (GstBaseSrc * bsrc);
       
   243 static gboolean gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment);
       
   244 static gboolean gst_app_src_is_seekable (GstBaseSrc * src);
       
   245 static gboolean gst_app_src_check_get_range (GstBaseSrc * src);
       
   246 static gboolean gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size);
       
   247 static gboolean gst_app_src_query (GstBaseSrc * src, GstQuery * query);
       
   248 
       
   249 static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
       
   250     GstBuffer * buffer);
       
   251 
       
   252 static guint gst_app_src_signals[LAST_SIGNAL] = { 0 };
       
   253 
       
   254 static void
       
   255 _do_init (GType filesrc_type)
       
   256 {
       
   257   static const GInterfaceInfo urihandler_info = {
       
   258     gst_app_src_uri_handler_init,
       
   259     NULL,
       
   260     NULL
       
   261   };
       
   262   g_type_add_interface_static (filesrc_type, GST_TYPE_URI_HANDLER,
       
   263       &urihandler_info);
       
   264 }
       
   265 
       
   266 #ifndef __SYMBIAN32__
       
   267 GST_BOILERPLATE_FULL (GstAppSrc, gst_app_src, GstBaseSrc, GST_TYPE_BASE_SRC,
       
   268     _do_init);
       
   269 #else
       
   270 
       
   271 static GstBaseSrcClass *parent_class = NULL;           
       
   272 static void
       
   273 gst_app_src_class_init_trampoline (gpointer g_class,
       
   274                         gpointer data)
       
   275 {                                   
       
   276   parent_class = (GstBaseSrcClass *)               
       
   277       g_type_class_peek_parent (g_class);              
       
   278       gst_app_src_class_init ((GstAppSrcClass *)g_class);       
       
   279 }
       
   280 
       
   281 EXPORT_C GType
       
   282 gst_app_src_get_type (void)
       
   283 {
       
   284 static GType object_type = 0;                      
       
   285  if (G_UNLIKELY (object_type == 0)) {                  
       
   286    object_type = gst_type_register_static_full(GST_TYPE_BASE_SRC,
       
   287        "GstAppSrc",
       
   288        sizeof (GstAppSrcClass),
       
   289        (GBaseInitFunc) gst_app_src_base_init,
       
   290        NULL,
       
   291        (GClassInitFunc) gst_app_src_class_init_trampoline,
       
   292        NULL,       
       
   293        NULL,
       
   294        sizeof (GstAppSrc),   
       
   295        0,
       
   296        (GInstanceInitFunc) gst_app_src_init,
       
   297        NULL,        
       
   298        (GTypeFlags) 0);
       
   299      _do_init (object_type);               
       
   300  }                                 
       
   301  return object_type;               
       
   302 }
       
   303 #endif
       
   304 
       
   305 
       
   306 
       
   307 static void
       
   308 gst_app_src_base_init (gpointer g_class)
       
   309 {
       
   310   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
       
   311 
       
   312   GST_DEBUG_CATEGORY_INIT (app_src_debug, "appsrc", 0, "appsrc element");
       
   313 
       
   314   gst_element_class_set_details_simple (element_class, "AppSrc",
       
   315       "Generic/Src", "Allow the application to feed buffers to a pipeline",
       
   316       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
       
   317 
       
   318   gst_element_class_add_pad_template (element_class,
       
   319       gst_static_pad_template_get (&gst_app_src_template));
       
   320 }
       
   321 
       
   322 static void
       
   323 gst_app_src_class_init (GstAppSrcClass * klass)
       
   324 {
       
   325   GObjectClass *gobject_class = (GObjectClass *) klass;
       
   326   GstBaseSrcClass *basesrc_class = (GstBaseSrcClass *) klass;
       
   327 
       
   328   gobject_class->dispose = gst_app_src_dispose;
       
   329   gobject_class->finalize = gst_app_src_finalize;
       
   330 
       
   331   gobject_class->set_property = gst_app_src_set_property;
       
   332   gobject_class->get_property = gst_app_src_get_property;
       
   333 
       
   334   /**
       
   335    * GstAppSrc::caps
       
   336    *
       
   337    * The GstCaps that will negotiated downstream and will be put
       
   338    * on outgoing buffers.
       
   339    */
       
   340   g_object_class_install_property (gobject_class, PROP_CAPS,
       
   341       g_param_spec_boxed ("caps", "Caps",
       
   342           "The allowed caps for the src pad", GST_TYPE_CAPS,
       
   343           G_PARAM_READWRITE));
       
   344   /**
       
   345    * GstAppSrc::format
       
   346    *
       
   347    * The format to use for segment events. When the source is producing
       
   348    * timestamped buffers this property should be set to GST_FORMAT_TIME.
       
   349    */
       
   350   g_object_class_install_property (gobject_class, PROP_FORMAT,
       
   351       g_param_spec_enum ("format", "Format",
       
   352           "The format of the segment events and seek", GST_TYPE_FORMAT,
       
   353           DEFAULT_PROP_FORMAT, G_PARAM_READWRITE));
       
   354   /**
       
   355    * GstAppSrc::size
       
   356    *
       
   357    * The total size in bytes of the data stream. If the total size is known, it
       
   358    * is recommended to configure it with this property.
       
   359    */
       
   360   g_object_class_install_property (gobject_class, PROP_SIZE,
       
   361       g_param_spec_int64 ("size", "Size",
       
   362           "The size of the data stream in bytes (-1 if unknown)",
       
   363           -1, G_MAXINT64, DEFAULT_PROP_SIZE,
       
   364           G_PARAM_READWRITE));
       
   365   /**
       
   366    * GstAppSrc::stream-type
       
   367    *
       
   368    * The type of stream that this source is producing.  For seekable streams the
       
   369    * application should connect to the seek-data signal.
       
   370    */
       
   371   g_object_class_install_property (gobject_class, PROP_STREAM_TYPE,
       
   372       g_param_spec_enum ("stream-type", "Stream Type",
       
   373           "the type of the stream", GST_TYPE_APP_STREAM_TYPE,
       
   374           DEFAULT_PROP_STREAM_TYPE,
       
   375           G_PARAM_READWRITE));
       
   376   /**
       
   377    * GstAppSrc::max-bytes
       
   378    *
       
   379    * The maximum amount of bytes that can be queued internally.
       
   380    * After the maximum amount of bytes are queued, appsrc will emit the
       
   381    * "enough-data" signal.
       
   382    */
       
   383   g_object_class_install_property (gobject_class, PROP_MAX_BYTES,
       
   384       g_param_spec_uint64 ("max-bytes", "Max bytes",
       
   385           "The maximum number of bytes to queue internally (0 = unlimited)",
       
   386           0, G_MAXUINT64, DEFAULT_PROP_MAX_BYTES,
       
   387           G_PARAM_READWRITE));
       
   388   /**
       
   389    * GstAppSrc::block
       
   390    *
       
   391    * When max-bytes are queued and after the enough-data signal has been emited,
       
   392    * block any further push-buffer calls until the amount of queued bytes drops
       
   393    * below the max-bytes limit.
       
   394    */
       
   395   g_object_class_install_property (gobject_class, PROP_BLOCK,
       
   396       g_param_spec_boolean ("block", "Block",
       
   397           "Block push-buffer when max-bytes are queued",
       
   398           DEFAULT_PROP_BLOCK, G_PARAM_READWRITE));
       
   399 
       
   400   /**
       
   401    * GstAppSrc::is-live
       
   402    *
       
   403    * Instruct the source to behave like a live source. This includes that it
       
   404    * will only push out buffers in the PLAYING state.
       
   405    */
       
   406   g_object_class_install_property (gobject_class, PROP_IS_LIVE,
       
   407       g_param_spec_boolean ("is-live", "Is Live",
       
   408           "Whether to act as a live source",
       
   409           DEFAULT_PROP_IS_LIVE, G_PARAM_READWRITE ));
       
   410   /**
       
   411    * GstAppSrc::min-latency
       
   412    *
       
   413    * The minimum latency of the source. A value of -1 will use the default
       
   414    * latency calculations of #GstBaseSrc.
       
   415    */
       
   416   g_object_class_install_property (gobject_class, PROP_MIN_LATENCY,
       
   417       g_param_spec_int64 ("min-latency", "Min Latency",
       
   418           "The minimum latency (-1 = default)",
       
   419           -1, G_MAXINT64, DEFAULT_PROP_MIN_LATENCY,
       
   420           G_PARAM_READWRITE));
       
   421   /**
       
   422    * GstAppSrc::max-latency
       
   423    *
       
   424    * The maximum latency of the source. A value of -1 means an unlimited amout
       
   425    * of latency.
       
   426    */
       
   427   g_object_class_install_property (gobject_class, PROP_MAX_LATENCY,
       
   428       g_param_spec_int64 ("max-latency", "Max Latency",
       
   429           "The maximum latency (-1 = unlimited)",
       
   430           -1, G_MAXINT64, DEFAULT_PROP_MAX_LATENCY,
       
   431           G_PARAM_READWRITE));
       
   432 
       
   433   /**
       
   434    * GstAppSrc::emit-signals
       
   435    *
       
   436    * Make appsrc emit the "need-data", "enough-data" and "seek-data" signals.
       
   437    * This option is by default enabled for backwards compatibility reasons but
       
   438    * can disabled when needed because signal emission is expensive.
       
   439    *
       
   440    * Since: 0.10.23
       
   441    */
       
   442   g_object_class_install_property (gobject_class, PROP_EMIT_SIGNALS,
       
   443       g_param_spec_boolean ("emit-signals", "Emit signals",
       
   444           "Emit new-preroll and new-buffer signals", DEFAULT_PROP_EMIT_SIGNALS,
       
   445           G_PARAM_READWRITE));
       
   446 
       
   447   /**
       
   448    * GstAppSrc::need-data:
       
   449    * @appsrc: the appsrc element that emited the signal
       
   450    * @length: the amount of bytes needed.
       
   451    *
       
   452    * Signal that the source needs more data. In the callback or from another
       
   453    * thread you should call push-buffer or end-of-stream.
       
   454    *
       
   455    * @length is just a hint and when it is set to -1, any number of bytes can be
       
   456    * pushed into @appsrc.
       
   457    *
       
   458    * You can call push-buffer multiple times until the enough-data signal is
       
   459    * fired.
       
   460    */
       
   461   gst_app_src_signals[SIGNAL_NEED_DATA] =
       
   462       g_signal_new ("need-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       
   463       G_STRUCT_OFFSET (GstAppSrcClass, need_data),
       
   464       NULL, NULL, __gst_app_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
       
   465 
       
   466   /**
       
   467    * GstAppSrc::enough-data:
       
   468    * @appsrc: the appsrc element that emited the signal
       
   469    *
       
   470    * Signal that the source has enough data. It is recommended that the
       
   471    * application stops calling push-buffer until the need-data signal is
       
   472    * emited again to avoid excessive buffer queueing.
       
   473    */
       
   474   gst_app_src_signals[SIGNAL_ENOUGH_DATA] =
       
   475       g_signal_new ("enough-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       
   476       G_STRUCT_OFFSET (GstAppSrcClass, enough_data),
       
   477       NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
       
   478 
       
   479   /**
       
   480    * GstAppSrc::seek-data:
       
   481    * @appsrc: the appsrc element that emited the signal
       
   482    * @offset: the offset to seek to
       
   483    *
       
   484    * Seek to the given offset. The next push-buffer should produce buffers from
       
   485    * the new @offset.
       
   486    * This callback is only called for seekable stream types.
       
   487    *
       
   488    * Returns: %TRUE if the seek succeeded.
       
   489    */
       
   490   gst_app_src_signals[SIGNAL_SEEK_DATA] =
       
   491       g_signal_new ("seek-data", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       
   492       G_STRUCT_OFFSET (GstAppSrcClass, seek_data),
       
   493       NULL, NULL, __gst_app_marshal_BOOLEAN__UINT64, G_TYPE_BOOLEAN, 1,
       
   494       G_TYPE_UINT64);
       
   495 
       
   496    /**
       
   497     * GstAppSrc::push-buffer:
       
   498     * @appsrc: the appsrc
       
   499     * @buffer: a buffer to push
       
   500     *
       
   501     * Adds a buffer to the queue of buffers that the appsrc element will
       
   502     * push to its source pad. This function does not take ownership of the
       
   503     * buffer so the buffer needs to be unreffed after calling this function.
       
   504     *
       
   505     * When the block property is TRUE, this function can block until free space
       
   506     * becomes available in the queue.
       
   507     */
       
   508   gst_app_src_signals[SIGNAL_PUSH_BUFFER] =
       
   509       g_signal_new ("push-buffer", G_TYPE_FROM_CLASS (klass),
       
   510       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
       
   511           push_buffer), NULL, NULL, __gst_app_marshal_ENUM__OBJECT,
       
   512       GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
       
   513 
       
   514    /**
       
   515     * GstAppSrc::end-of-stream:
       
   516     * @appsrc: the appsrc
       
   517     *
       
   518     * Notify @appsrc that no more buffer are available. 
       
   519     */
       
   520   gst_app_src_signals[SIGNAL_END_OF_STREAM] =
       
   521       g_signal_new ("end-of-stream", G_TYPE_FROM_CLASS (klass),
       
   522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
       
   523           end_of_stream), NULL, NULL, __gst_app_marshal_ENUM__VOID,
       
   524       GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
       
   525 
       
   526   basesrc_class->create = gst_app_src_create;
       
   527   basesrc_class->start = gst_app_src_start;
       
   528   basesrc_class->stop = gst_app_src_stop;
       
   529   basesrc_class->unlock = gst_app_src_unlock;
       
   530   basesrc_class->unlock_stop = gst_app_src_unlock_stop;
       
   531   basesrc_class->do_seek = gst_app_src_do_seek;
       
   532   basesrc_class->is_seekable = gst_app_src_is_seekable;
       
   533   basesrc_class->check_get_range = gst_app_src_check_get_range;
       
   534   basesrc_class->get_size = gst_app_src_do_get_size;
       
   535   basesrc_class->get_size = gst_app_src_do_get_size;
       
   536   basesrc_class->query = gst_app_src_query;
       
   537 
       
   538   klass->push_buffer = gst_app_src_push_buffer_action;
       
   539   klass->end_of_stream = gst_app_src_end_of_stream;
       
   540 
       
   541   g_type_class_add_private (klass, sizeof (GstAppSrcPrivate));
       
   542 }
       
   543 
       
   544 static void
       
   545 gst_app_src_init (GstAppSrc * appsrc, GstAppSrcClass * klass)
       
   546 {
       
   547   appsrc->priv = G_TYPE_INSTANCE_GET_PRIVATE (appsrc, GST_TYPE_APP_SRC,
       
   548       GstAppSrcPrivate);
       
   549 
       
   550   appsrc->priv->mutex = g_mutex_new ();
       
   551   appsrc->priv->cond = g_cond_new ();
       
   552   appsrc->priv->queue = g_queue_new ();
       
   553 
       
   554   appsrc->priv->size = DEFAULT_PROP_SIZE;
       
   555   appsrc->priv->stream_type = DEFAULT_PROP_STREAM_TYPE;
       
   556   appsrc->priv->max_bytes = DEFAULT_PROP_MAX_BYTES;
       
   557   appsrc->priv->format = DEFAULT_PROP_FORMAT;
       
   558   appsrc->priv->block = DEFAULT_PROP_BLOCK;
       
   559   appsrc->priv->min_latency = DEFAULT_PROP_MIN_LATENCY;
       
   560   appsrc->priv->max_latency = DEFAULT_PROP_MAX_LATENCY;
       
   561   appsrc->priv->emit_signals = DEFAULT_PROP_EMIT_SIGNALS;
       
   562 
       
   563   gst_base_src_set_live (GST_BASE_SRC (appsrc), DEFAULT_PROP_IS_LIVE);
       
   564 }
       
   565 
       
   566 static void
       
   567 gst_app_src_flush_queued (GstAppSrc * src)
       
   568 {
       
   569   GstBuffer *buf;
       
   570 
       
   571   while ((buf = g_queue_pop_head (src->priv->queue)))
       
   572     gst_buffer_unref (buf);
       
   573   src->priv->queued_bytes = 0;
       
   574 }
       
   575 
       
   576 static void
       
   577 gst_app_src_dispose (GObject * obj)
       
   578 {
       
   579   GstAppSrc *appsrc = GST_APP_SRC (obj);
       
   580 
       
   581   if (appsrc->priv->caps) {
       
   582     gst_caps_unref (appsrc->priv->caps);
       
   583     appsrc->priv->caps = NULL;
       
   584   }
       
   585   gst_app_src_flush_queued (appsrc);
       
   586 
       
   587   G_OBJECT_CLASS (parent_class)->dispose (obj);
       
   588 }
       
   589 
       
   590 static void
       
   591 gst_app_src_finalize (GObject * obj)
       
   592 {
       
   593   GstAppSrc *appsrc = GST_APP_SRC (obj);
       
   594 
       
   595   g_mutex_free (appsrc->priv->mutex);
       
   596   g_cond_free (appsrc->priv->cond);
       
   597   g_queue_free (appsrc->priv->queue);
       
   598 
       
   599   G_OBJECT_CLASS (parent_class)->finalize (obj);
       
   600 }
       
   601 
       
   602 static void
       
   603 gst_app_src_set_property (GObject * object, guint prop_id,
       
   604     const GValue * value, GParamSpec * pspec)
       
   605 {
       
   606   GstAppSrc *appsrc = GST_APP_SRC (object);
       
   607 
       
   608   switch (prop_id) {
       
   609     case PROP_CAPS:
       
   610       gst_app_src_set_caps (appsrc, gst_value_get_caps (value));
       
   611       break;
       
   612     case PROP_SIZE:
       
   613       gst_app_src_set_size (appsrc, g_value_get_int64 (value));
       
   614       break;
       
   615     case PROP_STREAM_TYPE:
       
   616       gst_app_src_set_stream_type (appsrc, g_value_get_enum (value));
       
   617       break;
       
   618     case PROP_MAX_BYTES:
       
   619       gst_app_src_set_max_bytes (appsrc, g_value_get_uint64 (value));
       
   620       break;
       
   621     case PROP_FORMAT:
       
   622       appsrc->priv->format = g_value_get_enum (value);
       
   623       break;
       
   624     case PROP_BLOCK:
       
   625       appsrc->priv->block = g_value_get_boolean (value);
       
   626       break;
       
   627     case PROP_IS_LIVE:
       
   628       gst_base_src_set_live (GST_BASE_SRC (appsrc),
       
   629           g_value_get_boolean (value));
       
   630       break;
       
   631     case PROP_MIN_LATENCY:
       
   632       gst_app_src_set_latencies (appsrc, TRUE, g_value_get_int64 (value),
       
   633           FALSE, -1);
       
   634       break;
       
   635     case PROP_MAX_LATENCY:
       
   636       gst_app_src_set_latencies (appsrc, FALSE, -1, TRUE,
       
   637           g_value_get_int64 (value));
       
   638       break;
       
   639     case PROP_EMIT_SIGNALS:
       
   640       gst_app_src_set_emit_signals (appsrc, g_value_get_boolean (value));
       
   641       break;
       
   642     default:
       
   643       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       
   644       break;
       
   645   }
       
   646 }
       
   647 
       
   648 static void
       
   649 gst_app_src_get_property (GObject * object, guint prop_id, GValue * value,
       
   650     GParamSpec * pspec)
       
   651 {
       
   652   GstAppSrc *appsrc = GST_APP_SRC (object);
       
   653 
       
   654   switch (prop_id) {
       
   655     case PROP_CAPS:
       
   656     {
       
   657       GstCaps *caps;
       
   658 
       
   659       /* we're missing a _take_caps() function to transfer ownership */
       
   660       caps = gst_app_src_get_caps (appsrc);
       
   661       gst_value_set_caps (value, caps);
       
   662       if (caps)
       
   663         gst_caps_unref (caps);
       
   664       break;
       
   665     }
       
   666     case PROP_SIZE:
       
   667       g_value_set_int64 (value, gst_app_src_get_size (appsrc));
       
   668       break;
       
   669     case PROP_STREAM_TYPE:
       
   670       g_value_set_enum (value, gst_app_src_get_stream_type (appsrc));
       
   671       break;
       
   672     case PROP_MAX_BYTES:
       
   673       g_value_set_uint64 (value, gst_app_src_get_max_bytes (appsrc));
       
   674       break;
       
   675     case PROP_FORMAT:
       
   676       g_value_set_enum (value, appsrc->priv->format);
       
   677       break;
       
   678     case PROP_BLOCK:
       
   679       g_value_set_boolean (value, appsrc->priv->block);
       
   680       break;
       
   681     case PROP_IS_LIVE:
       
   682       g_value_set_boolean (value, gst_base_src_is_live (GST_BASE_SRC (appsrc)));
       
   683       break;
       
   684     case PROP_MIN_LATENCY:
       
   685     {
       
   686       guint64 min;
       
   687 
       
   688       gst_app_src_get_latency (appsrc, &min, NULL);
       
   689       g_value_set_int64 (value, min);
       
   690       break;
       
   691     }
       
   692     case PROP_MAX_LATENCY:
       
   693     {
       
   694       guint64 max;
       
   695 
       
   696       gst_app_src_get_latency (appsrc, &max, NULL);
       
   697       g_value_set_int64 (value, max);
       
   698       break;
       
   699     }
       
   700     case PROP_EMIT_SIGNALS:
       
   701       g_value_set_boolean (value, gst_app_src_get_emit_signals (appsrc));
       
   702       break;
       
   703     default:
       
   704       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       
   705       break;
       
   706   }
       
   707 }
       
   708 
       
   709 static gboolean
       
   710 gst_app_src_unlock (GstBaseSrc * bsrc)
       
   711 {
       
   712   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
       
   713 
       
   714   g_mutex_lock (appsrc->priv->mutex);
       
   715   GST_DEBUG_OBJECT (appsrc, "unlock start");
       
   716   appsrc->priv->flushing = TRUE;
       
   717   g_cond_broadcast (appsrc->priv->cond);
       
   718   g_mutex_unlock (appsrc->priv->mutex);
       
   719 
       
   720   return TRUE;
       
   721 }
       
   722 
       
   723 static gboolean
       
   724 gst_app_src_unlock_stop (GstBaseSrc * bsrc)
       
   725 {
       
   726   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
       
   727 
       
   728   g_mutex_lock (appsrc->priv->mutex);
       
   729   GST_DEBUG_OBJECT (appsrc, "unlock stop");
       
   730   appsrc->priv->flushing = FALSE;
       
   731   g_cond_broadcast (appsrc->priv->cond);
       
   732   g_mutex_unlock (appsrc->priv->mutex);
       
   733 
       
   734   return TRUE;
       
   735 }
       
   736 
       
   737 static gboolean
       
   738 gst_app_src_start (GstBaseSrc * bsrc)
       
   739 {
       
   740   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
       
   741 
       
   742   g_mutex_lock (appsrc->priv->mutex);
       
   743   GST_DEBUG_OBJECT (appsrc, "starting");
       
   744   appsrc->priv->started = TRUE;
       
   745   /* set the offset to -1 so that we always do a first seek. This is only used
       
   746    * in random-access mode. */
       
   747   appsrc->priv->offset = -1;
       
   748   appsrc->priv->flushing = FALSE;
       
   749   g_mutex_unlock (appsrc->priv->mutex);
       
   750 
       
   751   gst_base_src_set_format (bsrc, appsrc->priv->format);
       
   752 
       
   753   return TRUE;
       
   754 }
       
   755 
       
   756 static gboolean
       
   757 gst_app_src_stop (GstBaseSrc * bsrc)
       
   758 {
       
   759   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
       
   760 
       
   761   g_mutex_lock (appsrc->priv->mutex);
       
   762   GST_DEBUG_OBJECT (appsrc, "stopping");
       
   763   appsrc->priv->is_eos = FALSE;
       
   764   appsrc->priv->flushing = TRUE;
       
   765   appsrc->priv->started = FALSE;
       
   766   gst_app_src_flush_queued (appsrc);
       
   767   g_mutex_unlock (appsrc->priv->mutex);
       
   768 
       
   769   return TRUE;
       
   770 }
       
   771 
       
   772 static gboolean
       
   773 gst_app_src_is_seekable (GstBaseSrc * src)
       
   774 {
       
   775   GstAppSrc *appsrc = GST_APP_SRC (src);
       
   776   gboolean res = FALSE;
       
   777 
       
   778   switch (appsrc->priv->stream_type) {
       
   779     case GST_APP_STREAM_TYPE_STREAM:
       
   780       break;
       
   781     case GST_APP_STREAM_TYPE_SEEKABLE:
       
   782     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
       
   783       res = TRUE;
       
   784       break;
       
   785   }
       
   786   return res;
       
   787 }
       
   788 
       
   789 static gboolean
       
   790 gst_app_src_check_get_range (GstBaseSrc * src)
       
   791 {
       
   792   GstAppSrc *appsrc = GST_APP_SRC (src);
       
   793   gboolean res = FALSE;
       
   794 
       
   795   switch (appsrc->priv->stream_type) {
       
   796     case GST_APP_STREAM_TYPE_STREAM:
       
   797     case GST_APP_STREAM_TYPE_SEEKABLE:
       
   798       break;
       
   799     case GST_APP_STREAM_TYPE_RANDOM_ACCESS:
       
   800       res = TRUE;
       
   801       break;
       
   802   }
       
   803   return res;
       
   804 }
       
   805 
       
   806 static gboolean
       
   807 gst_app_src_do_get_size (GstBaseSrc * src, guint64 * size)
       
   808 {
       
   809   GstAppSrc *appsrc = GST_APP_SRC (src);
       
   810 
       
   811   *size = gst_app_src_get_size (appsrc);
       
   812 
       
   813   return TRUE;
       
   814 }
       
   815 
       
   816 static gboolean
       
   817 gst_app_src_query (GstBaseSrc * src, GstQuery * query)
       
   818 {
       
   819   GstAppSrc *appsrc = GST_APP_SRC (src);
       
   820   gboolean res;
       
   821 
       
   822   switch (GST_QUERY_TYPE (query)) {
       
   823     case GST_QUERY_LATENCY:
       
   824     {
       
   825       GstClockTime min, max;
       
   826       gboolean live;
       
   827 
       
   828       /* Query the parent class for the defaults */
       
   829       res = gst_base_src_query_latency (src, &live, &min, &max);
       
   830 
       
   831       /* overwrite with our values when we need to */
       
   832       g_mutex_lock (appsrc->priv->mutex);
       
   833       if (appsrc->priv->min_latency != -1)
       
   834         min = appsrc->priv->min_latency;
       
   835       if (appsrc->priv->max_latency != -1)
       
   836         max = appsrc->priv->max_latency;
       
   837       g_mutex_unlock (appsrc->priv->mutex);
       
   838 
       
   839       gst_query_set_latency (query, live, min, max);
       
   840       break;
       
   841     }
       
   842     default:
       
   843       res = GST_BASE_SRC_CLASS (parent_class)->query (src, query);
       
   844       break;
       
   845   }
       
   846 
       
   847   return res;
       
   848 }
       
   849 
       
   850 /* will be called in push mode */
       
   851 static gboolean
       
   852 gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
       
   853 {
       
   854   GstAppSrc *appsrc = GST_APP_SRC (src);
       
   855   gint64 desired_position;
       
   856   gboolean res = FALSE;
       
   857 
       
   858   desired_position = segment->last_stop;
       
   859 
       
   860   GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
       
   861       desired_position, gst_format_get_name (segment->format));
       
   862 
       
   863   /* no need to try to seek in streaming mode */
       
   864   if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_STREAM)
       
   865     return TRUE;
       
   866 
       
   867   if (appsrc->priv->callbacks.seek_data)
       
   868     res = appsrc->priv->callbacks.seek_data (appsrc, desired_position,
       
   869         appsrc->priv->user_data);
       
   870   else {
       
   871     gboolean emit;
       
   872 
       
   873     g_mutex_lock (appsrc->priv->mutex);
       
   874     emit = appsrc->priv->emit_signals;
       
   875     g_mutex_unlock (appsrc->priv->mutex);
       
   876 
       
   877     if (emit)
       
   878       g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
       
   879           desired_position, &res);
       
   880   }
       
   881 
       
   882   if (res) {
       
   883     GST_DEBUG_OBJECT (appsrc, "flushing queue");
       
   884     gst_app_src_flush_queued (appsrc);
       
   885   } else {
       
   886     GST_WARNING_OBJECT (appsrc, "seek failed");
       
   887   }
       
   888 
       
   889   return res;
       
   890 }
       
   891 
       
   892 static GstFlowReturn
       
   893 gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
       
   894     GstBuffer ** buf)
       
   895 {
       
   896   GstAppSrc *appsrc = GST_APP_SRC (bsrc);
       
   897   GstFlowReturn ret;
       
   898 
       
   899   g_mutex_lock (appsrc->priv->mutex);
       
   900   /* check flushing first */
       
   901   if (G_UNLIKELY (appsrc->priv->flushing))
       
   902     goto flushing;
       
   903 
       
   904   if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
       
   905     /* if we are dealing with a random-access stream, issue a seek if the offset
       
   906      * changed. */
       
   907     if (G_UNLIKELY (appsrc->priv->offset != offset)) {
       
   908       gboolean res;
       
   909       gboolean emit;
       
   910 
       
   911       emit = appsrc->priv->emit_signals;
       
   912       g_mutex_unlock (appsrc->priv->mutex);
       
   913 
       
   914       GST_DEBUG_OBJECT (appsrc,
       
   915           "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
       
   916           appsrc->priv->offset, offset);
       
   917 
       
   918       if (appsrc->priv->callbacks.seek_data)
       
   919         res = appsrc->priv->callbacks.seek_data (appsrc, offset,
       
   920             appsrc->priv->user_data);
       
   921       else if (emit)
       
   922         g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
       
   923             offset, &res);
       
   924 
       
   925       if (G_UNLIKELY (!res))
       
   926         /* failing to seek is fatal */
       
   927         goto seek_error;
       
   928 
       
   929       g_mutex_lock (appsrc->priv->mutex);
       
   930 
       
   931       appsrc->priv->offset = offset;
       
   932     }
       
   933   }
       
   934 
       
   935   while (TRUE) {
       
   936     /* return data as long as we have some */
       
   937     if (!g_queue_is_empty (appsrc->priv->queue)) {
       
   938       guint buf_size;
       
   939 
       
   940       *buf = g_queue_pop_head (appsrc->priv->queue);
       
   941       buf_size = GST_BUFFER_SIZE (*buf);
       
   942 
       
   943       GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
       
   944 
       
   945       appsrc->priv->queued_bytes -= buf_size;
       
   946 
       
   947       /* only update the offset when in random_access mode */
       
   948       if (appsrc->priv->stream_type == GST_APP_STREAM_TYPE_RANDOM_ACCESS) {
       
   949         appsrc->priv->offset += buf_size;
       
   950       }
       
   951 
       
   952       gst_buffer_set_caps (*buf, appsrc->priv->caps);
       
   953 
       
   954       /* signal that we removed an item */
       
   955       g_cond_broadcast (appsrc->priv->cond);
       
   956 
       
   957       ret = GST_FLOW_OK;
       
   958       break;
       
   959     } else {
       
   960       gboolean emit;
       
   961 
       
   962       emit = appsrc->priv->emit_signals;
       
   963       g_mutex_unlock (appsrc->priv->mutex);
       
   964 
       
   965       /* we have no data, we need some. We fire the signal with the size hint. */
       
   966       if (appsrc->priv->callbacks.need_data)
       
   967         appsrc->priv->callbacks.need_data (appsrc, size,
       
   968             appsrc->priv->user_data);
       
   969       else if (emit)
       
   970         g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
       
   971             NULL);
       
   972 
       
   973       g_mutex_lock (appsrc->priv->mutex);
       
   974       /* we can be flushing now because we released the lock */
       
   975       if (G_UNLIKELY (appsrc->priv->flushing))
       
   976         goto flushing;
       
   977 
       
   978       /* if we have a buffer now, continue the loop and try to return it. In
       
   979        * random-access mode (where a buffer is normally pushed in the above
       
   980        * signal) we can still be empty because the pushed buffer got flushed or
       
   981        * when the application pushes the requested buffer later, we support both
       
   982        * possiblities. */
       
   983       if (!g_queue_is_empty (appsrc->priv->queue))
       
   984         continue;
       
   985 
       
   986       /* no buffer yet, maybe we are EOS, if not, block for more data. */
       
   987     }
       
   988 
       
   989     /* check EOS */
       
   990     if (G_UNLIKELY (appsrc->priv->is_eos))
       
   991       goto eos;
       
   992 
       
   993     /* nothing to return, wait a while for new data or flushing. */
       
   994     g_cond_wait (appsrc->priv->cond, appsrc->priv->mutex);
       
   995   }
       
   996   g_mutex_unlock (appsrc->priv->mutex);
       
   997 
       
   998   return ret;
       
   999 
       
  1000   /* ERRORS */
       
  1001 flushing:
       
  1002   {
       
  1003     GST_DEBUG_OBJECT (appsrc, "we are flushing");
       
  1004     g_mutex_unlock (appsrc->priv->mutex);
       
  1005     return GST_FLOW_WRONG_STATE;
       
  1006   }
       
  1007 eos:
       
  1008   {
       
  1009     GST_DEBUG_OBJECT (appsrc, "we are EOS");
       
  1010     g_mutex_unlock (appsrc->priv->mutex);
       
  1011     return GST_FLOW_UNEXPECTED;
       
  1012   }
       
  1013 seek_error:
       
  1014   {
       
  1015     GST_ELEMENT_ERROR (appsrc, RESOURCE, READ, ("failed to seek"),
       
  1016         GST_ERROR_SYSTEM);
       
  1017     return GST_FLOW_ERROR;
       
  1018   }
       
  1019 }
       
  1020 
       
  1021 /* external API */
       
  1022 
       
  1023 /**
       
  1024  * gst_app_src_set_caps:
       
  1025  * @appsrc: a #GstAppSrc
       
  1026  * @caps: caps to set
       
  1027  *
       
  1028  * Set the capabilities on the appsrc element.  This function takes
       
  1029  * a copy of the caps structure. After calling this method, the source will
       
  1030  * only produce caps that match @caps. @caps must be fixed and the caps on the
       
  1031  * buffers must match the caps or left NULL.
       
  1032  * 
       
  1033  * Since: 0.10.22
       
  1034  */
       
  1035 EXPORT_C void
       
  1036 gst_app_src_set_caps (GstAppSrc * appsrc, const GstCaps * caps)
       
  1037 {
       
  1038   GstCaps *old;
       
  1039 
       
  1040   g_return_if_fail (GST_IS_APP_SRC (appsrc));
       
  1041 
       
  1042   GST_OBJECT_LOCK (appsrc);
       
  1043   GST_DEBUG_OBJECT (appsrc, "setting caps to %" GST_PTR_FORMAT, caps);
       
  1044   if ((old = appsrc->priv->caps) != caps) {
       
  1045     if (caps)
       
  1046       appsrc->priv->caps = gst_caps_copy (caps);
       
  1047     else
       
  1048       appsrc->priv->caps = NULL;
       
  1049     if (old)
       
  1050       gst_caps_unref (old);
       
  1051   }
       
  1052   GST_OBJECT_UNLOCK (appsrc);
       
  1053 }
       
  1054 
       
  1055 /**
       
  1056  * gst_app_src_get_caps:
       
  1057  * @appsrc: a #GstAppSrc
       
  1058  *
       
  1059  * Get the configured caps on @appsrc.
       
  1060  *
       
  1061  * Returns: the #GstCaps produced by the source. gst_caps_unref() after usage.
       
  1062  * 
       
  1063  * Since: 0.10.22
       
  1064  */
       
  1065 EXPORT_C GstCaps *
       
  1066 gst_app_src_get_caps (GstAppSrc * appsrc)
       
  1067 {
       
  1068   GstCaps *caps;
       
  1069 
       
  1070   g_return_val_if_fail (appsrc != NULL, NULL);
       
  1071   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), NULL);
       
  1072 
       
  1073   GST_OBJECT_LOCK (appsrc);
       
  1074   if ((caps = appsrc->priv->caps))
       
  1075     gst_caps_ref (caps);
       
  1076   GST_DEBUG_OBJECT (appsrc, "getting caps of %" GST_PTR_FORMAT, caps);
       
  1077   GST_OBJECT_UNLOCK (appsrc);
       
  1078 
       
  1079   return caps;
       
  1080 }
       
  1081 
       
  1082 /**
       
  1083  * gst_app_src_set_size:
       
  1084  * @appsrc: a #GstAppSrc
       
  1085  * @size: the size to set
       
  1086  *
       
  1087  * Set the size of the stream in bytes. A value of -1 means that the size is
       
  1088  * not known. 
       
  1089  * 
       
  1090  * Since: 0.10.22
       
  1091  */
       
  1092 EXPORT_C void
       
  1093 gst_app_src_set_size (GstAppSrc * appsrc, gint64 size)
       
  1094 {
       
  1095   g_return_if_fail (appsrc != NULL);
       
  1096   g_return_if_fail (GST_IS_APP_SRC (appsrc));
       
  1097 
       
  1098   GST_OBJECT_LOCK (appsrc);
       
  1099   GST_DEBUG_OBJECT (appsrc, "setting size of %" G_GINT64_FORMAT, size);
       
  1100   appsrc->priv->size = size;
       
  1101   GST_OBJECT_UNLOCK (appsrc);
       
  1102 }
       
  1103 
       
  1104 /**
       
  1105  * gst_app_src_get_size:
       
  1106  * @appsrc: a #GstAppSrc
       
  1107  *
       
  1108  * Get the size of the stream in bytes. A value of -1 means that the size is
       
  1109  * not known. 
       
  1110  *
       
  1111  * Returns: the size of the stream previously set with gst_app_src_set_size();
       
  1112  * 
       
  1113  * Since: 0.10.22
       
  1114  */
       
  1115 EXPORT_C gint64
       
  1116 gst_app_src_get_size (GstAppSrc * appsrc)
       
  1117 {
       
  1118   gint64 size;
       
  1119 
       
  1120   g_return_val_if_fail (appsrc != NULL, -1);
       
  1121   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), -1);
       
  1122 
       
  1123   GST_OBJECT_LOCK (appsrc);
       
  1124   size = appsrc->priv->size;
       
  1125   GST_DEBUG_OBJECT (appsrc, "getting size of %" G_GINT64_FORMAT, size);
       
  1126   GST_OBJECT_UNLOCK (appsrc);
       
  1127 
       
  1128   return size;
       
  1129 }
       
  1130 
       
  1131 /**
       
  1132  * gst_app_src_set_stream_type:
       
  1133  * @appsrc: a #GstAppSrc
       
  1134  * @type: the new state
       
  1135  *
       
  1136  * Set the stream type on @appsrc. For seekable streams, the "seek" signal must
       
  1137  * be connected to.
       
  1138  *
       
  1139  * A stream_type stream 
       
  1140  * 
       
  1141  * Since: 0.10.22
       
  1142  */
       
  1143 EXPORT_C void
       
  1144 gst_app_src_set_stream_type (GstAppSrc * appsrc, GstAppStreamType type)
       
  1145 {
       
  1146   g_return_if_fail (appsrc != NULL);
       
  1147   g_return_if_fail (GST_IS_APP_SRC (appsrc));
       
  1148 
       
  1149   GST_OBJECT_LOCK (appsrc);
       
  1150   GST_DEBUG_OBJECT (appsrc, "setting stream_type of %d", type);
       
  1151   appsrc->priv->stream_type = type;
       
  1152   GST_OBJECT_UNLOCK (appsrc);
       
  1153 }
       
  1154 
       
  1155 /**
       
  1156  * gst_app_src_get_stream_type:
       
  1157  * @appsrc: a #GstAppSrc
       
  1158  *
       
  1159  * Get the stream type. Control the stream type of @appsrc
       
  1160  * with gst_app_src_set_stream_type().
       
  1161  *
       
  1162  * Returns: the stream type.
       
  1163  * 
       
  1164  * Since: 0.10.22
       
  1165  */
       
  1166 EXPORT_C GstAppStreamType
       
  1167 gst_app_src_get_stream_type (GstAppSrc * appsrc)
       
  1168 {
       
  1169   gboolean stream_type;
       
  1170 
       
  1171   g_return_val_if_fail (appsrc != NULL, FALSE);
       
  1172   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
       
  1173 
       
  1174   GST_OBJECT_LOCK (appsrc);
       
  1175   stream_type = appsrc->priv->stream_type;
       
  1176   GST_DEBUG_OBJECT (appsrc, "getting stream_type of %d", stream_type);
       
  1177   GST_OBJECT_UNLOCK (appsrc);
       
  1178 
       
  1179   return stream_type;
       
  1180 }
       
  1181 
       
  1182 /**
       
  1183  * gst_app_src_set_max_bytes:
       
  1184  * @appsrc: a #GstAppSrc
       
  1185  * @max: the maximum number of bytes to queue
       
  1186  *
       
  1187  * Set the maximum amount of bytes that can be queued in @appsrc.
       
  1188  * After the maximum amount of bytes are queued, @appsrc will emit the
       
  1189  * "enough-data" signal.
       
  1190  * 
       
  1191  * Since: 0.10.22
       
  1192  */
       
  1193 EXPORT_C void
       
  1194 gst_app_src_set_max_bytes (GstAppSrc * appsrc, guint64 max)
       
  1195 {
       
  1196   g_return_if_fail (GST_IS_APP_SRC (appsrc));
       
  1197 
       
  1198   g_mutex_lock (appsrc->priv->mutex);
       
  1199   if (max != appsrc->priv->max_bytes) {
       
  1200     GST_DEBUG_OBJECT (appsrc, "setting max-bytes to %" G_GUINT64_FORMAT, max);
       
  1201     appsrc->priv->max_bytes = max;
       
  1202     /* signal the change */
       
  1203     g_cond_broadcast (appsrc->priv->cond);
       
  1204   }
       
  1205   g_mutex_unlock (appsrc->priv->mutex);
       
  1206 }
       
  1207 
       
  1208 /**
       
  1209  * gst_app_src_get_max_bytes:
       
  1210  * @appsrc: a #GstAppSrc
       
  1211  *
       
  1212  * Get the maximum amount of bytes that can be queued in @appsrc.
       
  1213  *
       
  1214  * Returns: The maximum amount of bytes that can be queued.
       
  1215  * 
       
  1216  * Since: 0.10.22
       
  1217  */
       
  1218 EXPORT_C guint64
       
  1219 gst_app_src_get_max_bytes (GstAppSrc * appsrc)
       
  1220 {
       
  1221   guint64 result;
       
  1222 
       
  1223   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), 0);
       
  1224 
       
  1225   g_mutex_lock (appsrc->priv->mutex);
       
  1226   result = appsrc->priv->max_bytes;
       
  1227   GST_DEBUG_OBJECT (appsrc, "getting max-bytes of %" G_GUINT64_FORMAT, result);
       
  1228   g_mutex_unlock (appsrc->priv->mutex);
       
  1229 
       
  1230   return result;
       
  1231 }
       
  1232 
       
  1233 static void
       
  1234 gst_app_src_set_latencies (GstAppSrc * appsrc, gboolean do_min, guint64 min,
       
  1235     gboolean do_max, guint64 max)
       
  1236 {
       
  1237   gboolean changed = FALSE;
       
  1238 
       
  1239   g_mutex_lock (appsrc->priv->mutex);
       
  1240   if (do_min && appsrc->priv->min_latency != min) {
       
  1241     appsrc->priv->min_latency = min;
       
  1242     changed = TRUE;
       
  1243   }
       
  1244   if (do_max && appsrc->priv->max_latency != max) {
       
  1245     appsrc->priv->max_latency = max;
       
  1246     changed = TRUE;
       
  1247   }
       
  1248   g_mutex_unlock (appsrc->priv->mutex);
       
  1249 
       
  1250   if (changed) {
       
  1251     GST_DEBUG_OBJECT (appsrc, "posting latency changed");
       
  1252     gst_element_post_message (GST_ELEMENT_CAST (appsrc),
       
  1253         gst_message_new_latency (GST_OBJECT_CAST (appsrc)));
       
  1254   }
       
  1255 }
       
  1256 
       
  1257 /**
       
  1258  * gst_app_src_set_latency:
       
  1259  * @appsrc: a #GstAppSrc
       
  1260  * @min: the min latency
       
  1261  * @max: the min latency
       
  1262  *
       
  1263  * Configure the @min and @max latency in @src. If @min is set to -1, the
       
  1264  * default latency calculations for pseudo-live sources will be used.
       
  1265  * 
       
  1266  * Since: 0.10.22
       
  1267  */
       
  1268 EXPORT_C void
       
  1269 gst_app_src_set_latency (GstAppSrc * appsrc, guint64 min, guint64 max)
       
  1270 {
       
  1271   gst_app_src_set_latencies (appsrc, TRUE, min, TRUE, max);
       
  1272 }
       
  1273 
       
  1274 /**
       
  1275  * gst_app_src_get_latency:
       
  1276  * @appsrc: a #GstAppSrc
       
  1277  * @min: the min latency
       
  1278  * @max: the min latency
       
  1279  *
       
  1280  * Retrieve the min and max latencies in @min and @max respectively.
       
  1281  * 
       
  1282  * Since: 0.10.22
       
  1283  */
       
  1284 EXPORT_C void
       
  1285 gst_app_src_get_latency (GstAppSrc * appsrc, guint64 * min, guint64 * max)
       
  1286 {
       
  1287   g_return_if_fail (GST_IS_APP_SRC (appsrc));
       
  1288 
       
  1289   g_mutex_lock (appsrc->priv->mutex);
       
  1290   if (min)
       
  1291     *min = appsrc->priv->min_latency;
       
  1292   if (max)
       
  1293     *max = appsrc->priv->max_latency;
       
  1294   g_mutex_unlock (appsrc->priv->mutex);
       
  1295 }
       
  1296 
       
  1297 /**
       
  1298  * gst_app_src_set_emit_signals:
       
  1299  * @appsrc: a #GstAppSrc
       
  1300  * @emit: the new state
       
  1301  *
       
  1302  * Make appsrc emit the "new-preroll" and "new-buffer" signals. This option is
       
  1303  * by default disabled because signal emission is expensive and unneeded when
       
  1304  * the application prefers to operate in pull mode.
       
  1305  *
       
  1306  * Since: 0.10.23
       
  1307  */
       
  1308 EXPORT_C void
       
  1309 gst_app_src_set_emit_signals (GstAppSrc * appsrc, gboolean emit)
       
  1310 {
       
  1311   g_return_if_fail (GST_IS_APP_SRC (appsrc));
       
  1312 
       
  1313   g_mutex_lock (appsrc->priv->mutex);
       
  1314   appsrc->priv->emit_signals = emit;
       
  1315   g_mutex_unlock (appsrc->priv->mutex);
       
  1316 }
       
  1317 
       
  1318 /**
       
  1319  * gst_app_src_get_emit_signals:
       
  1320  * @appsrc: a #GstAppSrc
       
  1321  *
       
  1322  * Check if appsrc will emit the "new-preroll" and "new-buffer" signals.
       
  1323  *
       
  1324  * Returns: %TRUE if @appsrc is emiting the "new-preroll" and "new-buffer"
       
  1325  * signals.
       
  1326  *
       
  1327  * Since: 0.10.23
       
  1328  */
       
  1329 EXPORT_C gboolean
       
  1330 gst_app_src_get_emit_signals (GstAppSrc * appsrc)
       
  1331 {
       
  1332   gboolean result;
       
  1333 
       
  1334   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), FALSE);
       
  1335 
       
  1336   g_mutex_lock (appsrc->priv->mutex);
       
  1337   result = appsrc->priv->emit_signals;
       
  1338   g_mutex_unlock (appsrc->priv->mutex);
       
  1339 
       
  1340   return result;
       
  1341 }
       
  1342 
       
  1343 static GstFlowReturn
       
  1344 gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
       
  1345     gboolean steal_ref)
       
  1346 {
       
  1347   gboolean first = TRUE;
       
  1348 
       
  1349   g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
       
  1350   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
       
  1351   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
       
  1352 
       
  1353   g_mutex_lock (appsrc->priv->mutex);
       
  1354 
       
  1355   while (TRUE) {
       
  1356     /* can't accept buffers when we are flushing or EOS */
       
  1357     if (appsrc->priv->flushing)
       
  1358       goto flushing;
       
  1359 
       
  1360     if (appsrc->priv->is_eos)
       
  1361       goto eos;
       
  1362 
       
  1363     if (appsrc->priv->max_bytes
       
  1364         && appsrc->priv->queued_bytes >= appsrc->priv->max_bytes) {
       
  1365       GST_DEBUG_OBJECT (appsrc,
       
  1366           "queue filled (%" G_GUINT64_FORMAT " >= %" G_GUINT64_FORMAT ")",
       
  1367           appsrc->priv->queued_bytes, appsrc->priv->max_bytes);
       
  1368 
       
  1369       if (first) {
       
  1370         gboolean emit;
       
  1371 
       
  1372         emit = appsrc->priv->emit_signals;
       
  1373         /* only signal on the first push */
       
  1374         g_mutex_unlock (appsrc->priv->mutex);
       
  1375 
       
  1376         if (appsrc->priv->callbacks.enough_data)
       
  1377           appsrc->priv->callbacks.enough_data (appsrc, appsrc->priv->user_data);
       
  1378         else if (emit)
       
  1379           g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
       
  1380               NULL);
       
  1381 
       
  1382         g_mutex_lock (appsrc->priv->mutex);
       
  1383         /* continue to check for flushing/eos after releasing the lock */
       
  1384         first = FALSE;
       
  1385         continue;
       
  1386       }
       
  1387       if (appsrc->priv->block) {
       
  1388         GST_DEBUG_OBJECT (appsrc, "waiting for free space");
       
  1389         /* we are filled, wait until a buffer gets popped or when we
       
  1390          * flush. */
       
  1391         g_cond_wait (appsrc->priv->cond, appsrc->priv->mutex);
       
  1392       } else {
       
  1393         /* no need to wait for free space, we just pump more data into the
       
  1394          * queue hoping that the caller reacts to the enough-data signal and
       
  1395          * stops pushing buffers. */
       
  1396         break;
       
  1397       }
       
  1398     } else
       
  1399       break;
       
  1400   }
       
  1401 
       
  1402   GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
       
  1403   if (!steal_ref)
       
  1404     gst_buffer_ref (buffer);
       
  1405   g_queue_push_tail (appsrc->priv->queue, buffer);
       
  1406   appsrc->priv->queued_bytes += GST_BUFFER_SIZE (buffer);
       
  1407   g_cond_broadcast (appsrc->priv->cond);
       
  1408   g_mutex_unlock (appsrc->priv->mutex);
       
  1409 
       
  1410   return GST_FLOW_OK;
       
  1411 
       
  1412   /* ERRORS */
       
  1413 flushing:
       
  1414   {
       
  1415     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are flushing", buffer);
       
  1416     if (steal_ref)
       
  1417       gst_buffer_unref (buffer);
       
  1418     g_mutex_unlock (appsrc->priv->mutex);
       
  1419     return GST_FLOW_WRONG_STATE;
       
  1420   }
       
  1421 eos:
       
  1422   {
       
  1423     GST_DEBUG_OBJECT (appsrc, "refuse buffer %p, we are EOS", buffer);
       
  1424     if (steal_ref)
       
  1425       gst_buffer_unref (buffer);
       
  1426     g_mutex_unlock (appsrc->priv->mutex);
       
  1427     return GST_FLOW_UNEXPECTED;
       
  1428   }
       
  1429 }
       
  1430 
       
  1431 /**
       
  1432  * gst_app_src_push_buffer:
       
  1433  * @appsrc: a #GstAppSrc
       
  1434  * @buffer: a #GstBuffer to push
       
  1435  *
       
  1436  * Adds a buffer to the queue of buffers that the appsrc element will
       
  1437  * push to its source pad.  This function takes ownership of the buffer.
       
  1438  *
       
  1439  * When the block property is TRUE, this function can block until free
       
  1440  * space becomes available in the queue.
       
  1441  *
       
  1442  * Returns: #GST_FLOW_OK when the buffer was successfuly queued.
       
  1443  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
       
  1444  * #GST_FLOW_UNEXPECTED when EOS occured.
       
  1445  * 
       
  1446  * Since: 0.10.22
       
  1447  */
       
  1448 EXPORT_C GstFlowReturn
       
  1449 gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
       
  1450 {
       
  1451   return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
       
  1452 }
       
  1453 
       
  1454 /* push a buffer without stealing the ref of the buffer. This is used for the
       
  1455  * action signal. */
       
  1456 static GstFlowReturn
       
  1457 gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
       
  1458 {
       
  1459   return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
       
  1460 }
       
  1461 
       
  1462 /**
       
  1463  * gst_app_src_end_of_stream:
       
  1464  * @appsrc: a #GstAppSrc
       
  1465  *
       
  1466  * Indicates to the appsrc element that the last buffer queued in the
       
  1467  * element is the last buffer of the stream.
       
  1468  *
       
  1469  * Returns: #GST_FLOW_OK when the EOS was successfuly queued.
       
  1470  * #GST_FLOW_WRONG_STATE when @appsrc is not PAUSED or PLAYING.
       
  1471  * 
       
  1472  * Since: 0.10.22
       
  1473  */
       
  1474 EXPORT_C GstFlowReturn
       
  1475 gst_app_src_end_of_stream (GstAppSrc * appsrc)
       
  1476 {
       
  1477   g_return_val_if_fail (appsrc, GST_FLOW_ERROR);
       
  1478   g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
       
  1479 
       
  1480   g_mutex_lock (appsrc->priv->mutex);
       
  1481   /* can't accept buffers when we are flushing. We can accept them when we are 
       
  1482    * EOS although it will not do anything. */
       
  1483   if (appsrc->priv->flushing)
       
  1484     goto flushing;
       
  1485 
       
  1486   GST_DEBUG_OBJECT (appsrc, "sending EOS");
       
  1487   appsrc->priv->is_eos = TRUE;
       
  1488   g_cond_broadcast (appsrc->priv->cond);
       
  1489   g_mutex_unlock (appsrc->priv->mutex);
       
  1490 
       
  1491   return GST_FLOW_OK;
       
  1492 
       
  1493   /* ERRORS */
       
  1494 flushing:
       
  1495   {
       
  1496     g_mutex_unlock (appsrc->priv->mutex);
       
  1497     GST_DEBUG_OBJECT (appsrc, "refuse EOS, we are flushing");
       
  1498     return GST_FLOW_WRONG_STATE;
       
  1499   }
       
  1500 }
       
  1501 
       
  1502 /**
       
  1503  * gst_app_src_set_callbacks:
       
  1504  * @appsrc: a #GstAppSrc
       
  1505  * @callbacks: the callbacks
       
  1506  * @user_data: a user_data argument for the callbacks
       
  1507  * @notify: a destroy notify function
       
  1508  *
       
  1509  * Set callbacks which will be executed when data is needed, enough data has
       
  1510  * been collected or when a seek should be performed.
       
  1511  * This is an alternative to using the signals, it has lower overhead and is thus
       
  1512  * less expensive, but also less flexible.
       
  1513  *
       
  1514  * If callbacks are installed, no signals will be emited for performance
       
  1515  * reasons.
       
  1516  *
       
  1517  * Since: 0.10.23
       
  1518  */
       
  1519 EXPORT_C void
       
  1520 gst_app_src_set_callbacks (GstAppSrc * appsrc,
       
  1521     GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
       
  1522 {
       
  1523   GDestroyNotify old_notify;
       
  1524 
       
  1525   g_return_if_fail (appsrc != NULL);
       
  1526   g_return_if_fail (GST_IS_APP_SRC (appsrc));
       
  1527   g_return_if_fail (callbacks != NULL);
       
  1528 
       
  1529   GST_OBJECT_LOCK (appsrc);
       
  1530   old_notify = appsrc->priv->notify;
       
  1531 
       
  1532   if (old_notify) {
       
  1533     gpointer old_data;
       
  1534 
       
  1535     old_data = appsrc->priv->user_data;
       
  1536 
       
  1537     appsrc->priv->user_data = NULL;
       
  1538     appsrc->priv->notify = NULL;
       
  1539     GST_OBJECT_UNLOCK (appsrc);
       
  1540 
       
  1541     old_notify (old_data);
       
  1542 
       
  1543     GST_OBJECT_LOCK (appsrc);
       
  1544   }
       
  1545   appsrc->priv->callbacks = *callbacks;
       
  1546   appsrc->priv->user_data = user_data;
       
  1547   appsrc->priv->notify = notify;
       
  1548   GST_OBJECT_UNLOCK (appsrc);
       
  1549 }
       
  1550 
       
  1551 /*** GSTURIHANDLER INTERFACE *************************************************/
       
  1552 
       
  1553 static GstURIType
       
  1554 gst_app_src_uri_get_type (void)
       
  1555 {
       
  1556   return GST_URI_SRC;
       
  1557 }
       
  1558 
       
  1559 static gchar **
       
  1560 gst_app_src_uri_get_protocols (void)
       
  1561 {
       
  1562   static gchar *protocols[] = { "appsrc", NULL };
       
  1563 
       
  1564   return protocols;
       
  1565 }
       
  1566 
       
  1567 static const gchar *
       
  1568 gst_app_src_uri_get_uri (GstURIHandler * handler)
       
  1569 {
       
  1570   return "appsrc";
       
  1571 }
       
  1572 
       
  1573 static gboolean
       
  1574 gst_app_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
       
  1575 {
       
  1576   gchar *protocol;
       
  1577   gboolean ret;
       
  1578 
       
  1579   protocol = gst_uri_get_protocol (uri);
       
  1580   ret = !strcmp (protocol, "appsrc");
       
  1581   g_free (protocol);
       
  1582 
       
  1583   return ret;
       
  1584 }
       
  1585 
       
  1586 static void
       
  1587 gst_app_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
       
  1588 {
       
  1589   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
       
  1590 
       
  1591   iface->get_type = gst_app_src_uri_get_type;
       
  1592   iface->get_protocols = gst_app_src_uri_get_protocols;
       
  1593   iface->get_uri = gst_app_src_uri_get_uri;
       
  1594   iface->set_uri = gst_app_src_uri_set_uri;
       
  1595 }