gstreamer_core/gst/gstbus.c
changeset 0 0e761a78d257
child 8 4a7fac7dd34a
equal deleted inserted replaced
-1:000000000000 0:0e761a78d257
       
     1 /* GStreamer
       
     2  * Copyright (C) 2004 Wim Taymans <wim@fluendo.com>
       
     3  *
       
     4  * gstbus.c: GstBus subsystem
       
     5  *
       
     6  * This library is free software; you can redistribute it and/or
       
     7  * modify it under the terms of the GNU Library General Public
       
     8  * License as published by the Free Software Foundation; either
       
     9  * version 2 of the License, or (at your option) any later version.
       
    10  *
       
    11  * This library is distributed in the hope that it will be useful,
       
    12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    14  * Library General Public License for more details.
       
    15  *
       
    16  * You should have received a copy of the GNU Library General Public
       
    17  * License along with this library; if not, write to the
       
    18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       
    19  * Boston, MA 02111-1307, USA.
       
    20  */
       
    21 
       
    22 /**
       
    23  * SECTION:gstbus
       
    24  * @short_description: Asynchronous message bus subsystem
       
    25  * @see_also: #GstMessage, #GstElement
       
    26  *
       
    27  * The #GstBus is an object responsible for delivering #GstMessages in
       
    28  * a first-in first-out way from the streaming threads to the application.
       
    29  *
       
    30  * Since the application typically only wants to deal with delivery of these
       
    31  * messages from one thread, the GstBus will marshall the messages between
       
    32  * different threads. This is important since the actual streaming of media
       
    33  * is done in another thread than the application.
       
    34  *
       
    35  * The GstBus provides support for #GSource based notifications. This makes it
       
    36  * possible to handle the delivery in the glib mainloop.
       
    37  *
       
    38  * The #GSource callback function gst_bus_async_signal_func() can be used to
       
    39  * convert all bus messages into signal emissions.
       
    40  *
       
    41  * A message is posted on the bus with the gst_bus_post() method. With the
       
    42  * gst_bus_peek() and gst_bus_pop() methods one can look at or retrieve a
       
    43  * previously posted message.
       
    44  *
       
    45  * The bus can be polled with the gst_bus_poll() method. This methods blocks
       
    46  * up to the specified timeout value until one of the specified messages types
       
    47  * is posted on the bus. The application can then _pop() the messages from the
       
    48  * bus to handle them.
       
    49  * Alternatively the application can register an asynchronous bus function
       
    50  * using gst_bus_add_watch_full() or gst_bus_add_watch(). This function will
       
    51  * install a #GSource in the default glib main loop and will deliver messages 
       
    52  * a short while after they have been posted. Note that the main loop should 
       
    53  * be running for the asynchronous callbacks.
       
    54  *
       
    55  * It is also possible to get messages from the bus without any thread
       
    56  * marshalling with the gst_bus_set_sync_handler() method. This makes it
       
    57  * possible to react to a message in the same thread that posted the
       
    58  * message on the bus. This should only be used if the application is able
       
    59  * to deal with messages from different threads.
       
    60  *
       
    61  * Every #GstPipeline has one bus.
       
    62  *
       
    63  * Note that a #GstPipeline will set its bus into flushing state when changing
       
    64  * from READY to NULL state.
       
    65  *
       
    66  * Last reviewed on 2006-03-12 (0.10.5)
       
    67  */
       
    68 
       
    69 #include "gst_private.h"
       
    70 #include <errno.h>
       
    71 #ifdef HAVE_UNISTD_H
       
    72 #  include <unistd.h>
       
    73 #endif
       
    74 #include <sys/types.h>
       
    75 
       
    76 #include "gstinfo.h"
       
    77 
       
    78 #include "gstbus.h"
       
    79 
       
    80 #ifdef __SYMBIAN32__
       
    81 #include <glib_global.h>
       
    82 #endif
       
    83 
       
    84 #define GST_CAT_DEFAULT GST_CAT_BUS
       
    85 /* bus signals */
       
    86 enum
       
    87 {
       
    88   SYNC_MESSAGE,
       
    89   ASYNC_MESSAGE,
       
    90   /* add more above */
       
    91   LAST_SIGNAL
       
    92 };
       
    93 
       
    94 static void gst_bus_class_init (GstBusClass * klass);
       
    95 static void gst_bus_init (GstBus * bus);
       
    96 static void gst_bus_dispose (GObject * object);
       
    97 
       
    98 static void gst_bus_set_property (GObject * object, guint prop_id,
       
    99     const GValue * value, GParamSpec * pspec);
       
   100 static void gst_bus_get_property (GObject * object, guint prop_id,
       
   101     GValue * value, GParamSpec * pspec);
       
   102 
       
   103 static GstObjectClass *parent_class = NULL;
       
   104 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
       
   105 
       
   106 /* the context we wakeup when we posted a message on the bus */
       
   107 static GMainContext *main_context;
       
   108 
       
   109 struct _GstBusPrivate
       
   110 {
       
   111   guint num_sync_message_emitters;
       
   112 
       
   113   GCond *queue_cond;
       
   114 };
       
   115 #ifdef __SYMBIAN32__
       
   116 EXPORT_C
       
   117 #endif
       
   118 
       
   119 
       
   120 GType
       
   121 gst_bus_get_type (void)
       
   122 {
       
   123   static GType bus_type = 0;
       
   124 
       
   125   if (G_UNLIKELY (bus_type == 0)) {
       
   126     static const GTypeInfo bus_info = {
       
   127       sizeof (GstBusClass),
       
   128       NULL,
       
   129       NULL,
       
   130       (GClassInitFunc) gst_bus_class_init,
       
   131       NULL,
       
   132       NULL,
       
   133       sizeof (GstBus),
       
   134       0,
       
   135       (GInstanceInitFunc) gst_bus_init,
       
   136       NULL
       
   137     };
       
   138 
       
   139     bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
       
   140   }
       
   141   return bus_type;
       
   142 }
       
   143 
       
   144 /* fixme: do something about this */
       
   145 static void
       
   146 marshal_VOID__MINIOBJECT (GClosure * closure, GValue * return_value,
       
   147     guint n_param_values, const GValue * param_values, gpointer invocation_hint,
       
   148     gpointer marshal_data)
       
   149 {
       
   150   typedef void (*marshalfunc_VOID__MINIOBJECT) (gpointer obj, gpointer arg1,
       
   151       gpointer data2);
       
   152   register marshalfunc_VOID__MINIOBJECT callback;
       
   153   register GCClosure *cc = (GCClosure *) closure;
       
   154   register gpointer data1, data2;
       
   155 
       
   156   g_return_if_fail (n_param_values == 2);
       
   157 
       
   158   if (G_CCLOSURE_SWAP_DATA (closure)) {
       
   159     data1 = closure->data;
       
   160     data2 = g_value_peek_pointer (param_values + 0);
       
   161   } else {
       
   162     data1 = g_value_peek_pointer (param_values + 0);
       
   163     data2 = closure->data;
       
   164   }
       
   165   callback =
       
   166       (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : cc->
       
   167       callback);
       
   168 
       
   169   callback (data1, gst_value_get_mini_object (param_values + 1), data2);
       
   170 }
       
   171 
       
   172 static void
       
   173 gst_bus_class_init (GstBusClass * klass)
       
   174 {
       
   175   GObjectClass *gobject_class;
       
   176   GstObjectClass *gstobject_class;
       
   177 
       
   178   gobject_class = (GObjectClass *) klass;
       
   179   gstobject_class = (GstObjectClass *) klass;
       
   180 
       
   181   parent_class = g_type_class_peek_parent (klass);
       
   182 
       
   183   if (!g_thread_supported ())
       
   184     g_thread_init (NULL);
       
   185 
       
   186   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
       
   187   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
       
   188   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
       
   189 
       
   190   /**
       
   191    * GstBus::sync-message:
       
   192    * @bus: the object which received the signal
       
   193    * @message: the message that has been posted synchronously
       
   194    *
       
   195    * A message has been posted on the bus. This signal is emitted from the
       
   196    * thread that posted the message so one has to be careful with locking.
       
   197    *
       
   198    * This signal will not be emitted by default, you have to set up
       
   199    * gst_bus_sync_signal_handler() as a sync handler if you want this
       
   200    * signal to be emitted when a message is posted on the bus, like this:
       
   201    * <programlisting>
       
   202    * gst_bus_set_sync_handler (bus, gst_bus_sync_signal_handler, yourdata);
       
   203    * </programlisting>
       
   204    */
       
   205   gst_bus_signals[SYNC_MESSAGE] =
       
   206       g_signal_new ("sync-message", G_TYPE_FROM_CLASS (klass),
       
   207       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
       
   208       G_STRUCT_OFFSET (GstBusClass, sync_message), NULL, NULL,
       
   209       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
       
   210 
       
   211   /**
       
   212    * GstBus::message:
       
   213    * @bus: the object which received the signal
       
   214    * @message: the message that has been posted asynchronously
       
   215    *
       
   216    * A message has been posted on the bus. This signal is emitted from a
       
   217    * GSource added to the mainloop. this signal will only be emitted when
       
   218    * there is a mainloop running.
       
   219    */
       
   220   gst_bus_signals[ASYNC_MESSAGE] =
       
   221       g_signal_new ("message", G_TYPE_FROM_CLASS (klass),
       
   222       G_SIGNAL_RUN_LAST | G_SIGNAL_DETAILED,
       
   223       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
       
   224       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
       
   225 
       
   226   main_context = g_main_context_default ();
       
   227 
       
   228   g_type_class_add_private (klass, sizeof (GstBusPrivate));
       
   229 }
       
   230 
       
   231 static void
       
   232 gst_bus_init (GstBus * bus)
       
   233 {
       
   234   bus->queue = g_queue_new ();
       
   235   bus->queue_lock = g_mutex_new ();
       
   236 
       
   237   bus->priv = G_TYPE_INSTANCE_GET_PRIVATE (bus, GST_TYPE_BUS, GstBusPrivate);
       
   238   bus->priv->queue_cond = g_cond_new ();
       
   239 
       
   240   GST_DEBUG_OBJECT (bus, "created");
       
   241 }
       
   242 
       
   243 static void
       
   244 gst_bus_dispose (GObject * object)
       
   245 {
       
   246   GstBus *bus;
       
   247 
       
   248   bus = GST_BUS (object);
       
   249 
       
   250   if (bus->queue) {
       
   251     GstMessage *message;
       
   252 
       
   253     g_mutex_lock (bus->queue_lock);
       
   254     do {
       
   255       message = g_queue_pop_head (bus->queue);
       
   256       if (message)
       
   257         gst_message_unref (message);
       
   258     } while (message != NULL);
       
   259     g_queue_free (bus->queue);
       
   260     bus->queue = NULL;
       
   261     g_mutex_unlock (bus->queue_lock);
       
   262     g_mutex_free (bus->queue_lock);
       
   263     bus->queue_lock = NULL;
       
   264     g_cond_free (bus->priv->queue_cond);
       
   265     bus->priv->queue_cond = NULL;
       
   266   }
       
   267 
       
   268   G_OBJECT_CLASS (parent_class)->dispose (object);
       
   269 }
       
   270 
       
   271 static void
       
   272 gst_bus_set_property (GObject * object, guint prop_id,
       
   273     const GValue * value, GParamSpec * pspec)
       
   274 {
       
   275   GstBus *bus;
       
   276 
       
   277   bus = GST_BUS (object);
       
   278 
       
   279   switch (prop_id) {
       
   280     default:
       
   281       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       
   282       break;
       
   283   }
       
   284 }
       
   285 
       
   286 static void
       
   287 gst_bus_get_property (GObject * object, guint prop_id,
       
   288     GValue * value, GParamSpec * pspec)
       
   289 {
       
   290   GstBus *bus;
       
   291 
       
   292   bus = GST_BUS (object);
       
   293 
       
   294   switch (prop_id) {
       
   295     default:
       
   296       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       
   297       break;
       
   298   }
       
   299 }
       
   300 
       
   301 /**
       
   302  * gst_bus_new:
       
   303  *
       
   304  * Creates a new #GstBus instance.
       
   305  *
       
   306  * Returns: a new #GstBus instance
       
   307  */
       
   308 #ifdef __SYMBIAN32__
       
   309 EXPORT_C
       
   310 #endif
       
   311 
       
   312 GstBus *
       
   313 gst_bus_new (void)
       
   314 {
       
   315   GstBus *result;
       
   316 
       
   317   result = g_object_new (gst_bus_get_type (), NULL);
       
   318   GST_DEBUG_OBJECT (result, "created new bus");
       
   319 
       
   320   return result;
       
   321 }
       
   322 
       
   323 /**
       
   324  * gst_bus_post:
       
   325  * @bus: a #GstBus to post on
       
   326  * @message: The #GstMessage to post
       
   327  *
       
   328  * Post a message on the given bus. Ownership of the message
       
   329  * is taken by the bus.
       
   330  *
       
   331  * Returns: TRUE if the message could be posted, FALSE if the bus is flushing.
       
   332  *
       
   333  * MT safe.
       
   334  */
       
   335 #ifdef __SYMBIAN32__
       
   336 EXPORT_C
       
   337 #endif
       
   338 
       
   339 gboolean
       
   340 gst_bus_post (GstBus * bus, GstMessage * message)
       
   341 {
       
   342   GstBusSyncReply reply = GST_BUS_PASS;
       
   343   GstBusSyncHandler handler;
       
   344   gboolean emit_sync_message;
       
   345   gpointer handler_data;
       
   346 
       
   347   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
       
   348   g_return_val_if_fail (GST_IS_MESSAGE (message), FALSE);
       
   349 
       
   350   GST_DEBUG_OBJECT (bus, "[msg %p] posting on bus, type %s, %" GST_PTR_FORMAT
       
   351       " from source %" GST_PTR_FORMAT,
       
   352       message, GST_MESSAGE_TYPE_NAME (message), message->structure,
       
   353       message->src);
       
   354 
       
   355   GST_OBJECT_LOCK (bus);
       
   356   /* check if the bus is flushing */
       
   357   if (GST_OBJECT_FLAG_IS_SET (bus, GST_BUS_FLUSHING))
       
   358     goto is_flushing;
       
   359 
       
   360   handler = bus->sync_handler;
       
   361   handler_data = bus->sync_handler_data;
       
   362   emit_sync_message = bus->priv->num_sync_message_emitters > 0;
       
   363   GST_OBJECT_UNLOCK (bus);
       
   364 
       
   365   /* first call the sync handler if it is installed */
       
   366   if (handler)
       
   367     reply = handler (bus, message, handler_data);
       
   368 
       
   369   /* emit sync-message if requested to do so via
       
   370      gst_bus_enable_sync_message_emission. terrible but effective */
       
   371   if (emit_sync_message && reply != GST_BUS_DROP
       
   372       && handler != gst_bus_sync_signal_handler)
       
   373     gst_bus_sync_signal_handler (bus, message, NULL);
       
   374 
       
   375   /* now see what we should do with the message */
       
   376   switch (reply) {
       
   377     case GST_BUS_DROP:
       
   378       /* drop the message */
       
   379       GST_DEBUG_OBJECT (bus, "[msg %p] dropped", message);
       
   380       break;
       
   381     case GST_BUS_PASS:
       
   382       /* pass the message to the async queue, refcount passed in the queue */
       
   383       GST_DEBUG_OBJECT (bus, "[msg %p] pushing on async queue", message);
       
   384       g_mutex_lock (bus->queue_lock);
       
   385       g_queue_push_tail (bus->queue, message);
       
   386       g_cond_broadcast (bus->priv->queue_cond);
       
   387       g_mutex_unlock (bus->queue_lock);
       
   388       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
       
   389 
       
   390       /* FIXME cannot assume sources are only in the default context */
       
   391       g_main_context_wakeup (main_context);
       
   392 
       
   393       break;
       
   394     case GST_BUS_ASYNC:
       
   395     {
       
   396       /* async delivery, we need a mutex and a cond to block
       
   397        * on */
       
   398       GMutex *lock = g_mutex_new ();
       
   399       GCond *cond = g_cond_new ();
       
   400 
       
   401       GST_MESSAGE_COND (message) = cond;
       
   402       GST_MESSAGE_GET_LOCK (message) = lock;
       
   403 
       
   404       GST_DEBUG_OBJECT (bus, "[msg %p] waiting for async delivery", message);
       
   405 
       
   406       /* now we lock the message mutex, send the message to the async
       
   407        * queue. When the message is handled by the app and destroyed,
       
   408        * the cond will be signalled and we can continue */
       
   409       g_mutex_lock (lock);
       
   410       g_mutex_lock (bus->queue_lock);
       
   411       g_queue_push_tail (bus->queue, message);
       
   412       g_cond_broadcast (bus->priv->queue_cond);
       
   413       g_mutex_unlock (bus->queue_lock);
       
   414 
       
   415       /* FIXME cannot assume sources are only in the default context */
       
   416       g_main_context_wakeup (main_context);
       
   417 
       
   418       /* now block till the message is freed */
       
   419       g_cond_wait (cond, lock);
       
   420       g_mutex_unlock (lock);
       
   421 
       
   422       GST_DEBUG_OBJECT (bus, "[msg %p] delivered asynchronously", message);
       
   423 
       
   424       g_mutex_free (lock);
       
   425       g_cond_free (cond);
       
   426       break;
       
   427     }
       
   428     default:
       
   429       g_warning ("invalid return from bus sync handler");
       
   430       break;
       
   431   }
       
   432   return TRUE;
       
   433 
       
   434   /* ERRORS */
       
   435 is_flushing:
       
   436   {
       
   437     GST_DEBUG_OBJECT (bus, "bus is flushing");
       
   438     gst_message_unref (message);
       
   439     GST_OBJECT_UNLOCK (bus);
       
   440 
       
   441     return FALSE;
       
   442   }
       
   443 }
       
   444 
       
   445 /**
       
   446  * gst_bus_have_pending:
       
   447  * @bus: a #GstBus to check
       
   448  *
       
   449  * Check if there are pending messages on the bus that
       
   450  * should be handled.
       
   451  *
       
   452  * Returns: TRUE if there are messages on the bus to be handled, FALSE 
       
   453  * otherwise.
       
   454  *
       
   455  * MT safe.
       
   456  */
       
   457 #ifdef __SYMBIAN32__
       
   458 EXPORT_C
       
   459 #endif
       
   460 
       
   461 gboolean
       
   462 gst_bus_have_pending (GstBus * bus)
       
   463 {
       
   464   gboolean result;
       
   465 
       
   466   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
       
   467 
       
   468   g_mutex_lock (bus->queue_lock);
       
   469   /* see if there is a message on the bus */
       
   470   result = !g_queue_is_empty (bus->queue);
       
   471   g_mutex_unlock (bus->queue_lock);
       
   472 
       
   473   return result;
       
   474 }
       
   475 
       
   476 /**
       
   477  * gst_bus_set_flushing:
       
   478  * @bus: a #GstBus
       
   479  * @flushing: whether or not to flush the bus
       
   480  *
       
   481  * If @flushing, flush out and unref any messages queued in the bus. Releases
       
   482  * references to the message origin objects. Will flush future messages until
       
   483  * gst_bus_set_flushing() sets @flushing to #FALSE.
       
   484  *
       
   485  * MT safe.
       
   486  */
       
   487 #ifdef __SYMBIAN32__
       
   488 EXPORT_C
       
   489 #endif
       
   490 
       
   491 void
       
   492 gst_bus_set_flushing (GstBus * bus, gboolean flushing)
       
   493 {
       
   494   GstMessage *message;
       
   495 
       
   496   GST_OBJECT_LOCK (bus);
       
   497 
       
   498   if (flushing) {
       
   499     GST_OBJECT_FLAG_SET (bus, GST_BUS_FLUSHING);
       
   500 
       
   501     GST_DEBUG_OBJECT (bus, "set bus flushing");
       
   502 
       
   503     while ((message = gst_bus_pop (bus)))
       
   504       gst_message_unref (message);
       
   505   } else {
       
   506     GST_DEBUG_OBJECT (bus, "unset bus flushing");
       
   507     GST_OBJECT_FLAG_UNSET (bus, GST_BUS_FLUSHING);
       
   508   }
       
   509 
       
   510   GST_OBJECT_UNLOCK (bus);
       
   511 }
       
   512 
       
   513 /**
       
   514  * gst_bus_timed_pop_filtered:
       
   515  * @bus: a #GstBus to pop from
       
   516  * @timeout: a timeout in nanoseconds, or GST_CLOCK_TIME_NONE to wait forever
       
   517  * @types: message types to take into account, GST_MESSAGE_ANY for any type
       
   518  *
       
   519  * Get a message from the bus whose type matches the message type mask @types,
       
   520  * waiting up to the specified timeout (and discarding any messages that do not
       
   521  * match the mask provided).
       
   522  *
       
   523  * If @timeout is 0, this function behaves like gst_bus_pop_filtered(). If
       
   524  * @timeout is #GST_CLOCK_TIME_NONE, this function will block forever until a
       
   525  * matching message was posted on the bus.
       
   526  *
       
   527  * Returns: a #GstMessage matching the filter in @types, or NULL if no matching
       
   528  * message was found on the bus until the timeout expired.
       
   529  * The message is taken from the bus and needs to be unreffed with
       
   530  * gst_message_unref() after usage.
       
   531  *
       
   532  * MT safe.
       
   533  *
       
   534  * Since: 0.10.15
       
   535  */
       
   536 #ifdef __SYMBIAN32__
       
   537 EXPORT_C
       
   538 #endif
       
   539 
       
   540 GstMessage *
       
   541 gst_bus_timed_pop_filtered (GstBus * bus, GstClockTime timeout,
       
   542     GstMessageType types)
       
   543 {
       
   544   GstMessage *message;
       
   545   GTimeVal *timeval, abstimeout;
       
   546   gboolean first_round = TRUE;
       
   547 
       
   548   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
       
   549   g_return_val_if_fail (types != 0, NULL);
       
   550 
       
   551   g_mutex_lock (bus->queue_lock);
       
   552 
       
   553   while (TRUE) {
       
   554     GST_LOG_OBJECT (bus, "have %d messages", g_queue_get_length (bus->queue));
       
   555 
       
   556     while ((message = g_queue_pop_head (bus->queue))) {
       
   557       GST_DEBUG_OBJECT (bus, "got message %p, %s, type mask is %u",
       
   558           message, GST_MESSAGE_TYPE_NAME (message), (guint) types);
       
   559       if ((GST_MESSAGE_TYPE (message) & types) != 0) {
       
   560         /* exit the loop, we have a message */
       
   561         goto beach;
       
   562       } else {
       
   563         GST_DEBUG_OBJECT (bus, "discarding message, does not match mask");
       
   564         gst_message_unref (message);
       
   565         message = NULL;
       
   566       }
       
   567     }
       
   568 
       
   569     /* no need to wait, exit loop */
       
   570     if (timeout == 0)
       
   571       break;
       
   572 
       
   573     if (timeout == GST_CLOCK_TIME_NONE) {
       
   574       /* wait forever */
       
   575       timeval = NULL;
       
   576     } else if (first_round) {
       
   577       glong add = timeout / 1000;
       
   578 
       
   579       if (add == 0)
       
   580         /* no need to wait */
       
   581         break;
       
   582 
       
   583       /* make timeout absolute */
       
   584       g_get_current_time (&abstimeout);
       
   585       g_time_val_add (&abstimeout, add);
       
   586       timeval = &abstimeout;
       
   587       first_round = FALSE;
       
   588       GST_DEBUG_OBJECT (bus, "blocking for message, timeout %ld", add);
       
   589     } else {
       
   590       /* calculated the absolute end time already, no need to do it again */
       
   591       GST_DEBUG_OBJECT (bus, "blocking for message, again");
       
   592       timeval = &abstimeout;    /* fool compiler */
       
   593     }
       
   594     if (!g_cond_timed_wait (bus->priv->queue_cond, bus->queue_lock, timeval)) {
       
   595       GST_INFO_OBJECT (bus, "timed out, breaking loop");
       
   596       break;
       
   597     } else {
       
   598       GST_INFO_OBJECT (bus, "we got woken up, recheck for message");
       
   599     }
       
   600   }
       
   601 
       
   602 beach:
       
   603 
       
   604   g_mutex_unlock (bus->queue_lock);
       
   605 
       
   606   return message;
       
   607 }
       
   608 
       
   609 
       
   610 /**
       
   611  * gst_bus_timed_pop:
       
   612  * @bus: a #GstBus to pop
       
   613  * @timeout: a timeout
       
   614  *
       
   615  * Get a message from the bus, waiting up to the specified timeout.
       
   616  *
       
   617  * If @timeout is 0, this function behaves like gst_bus_pop(). If @timeout is
       
   618  * #GST_CLOCK_TIME_NONE, this function will block forever until a message was
       
   619  * posted on the bus.
       
   620  *
       
   621  * Returns: The #GstMessage that is on the bus after the specified timeout
       
   622  * or NULL if the bus is empty after the timeout expired.
       
   623  * The message is taken from the bus and needs to be unreffed with
       
   624  * gst_message_unref() after usage.
       
   625  *
       
   626  * MT safe.
       
   627  *
       
   628  * Since: 0.10.12
       
   629  */
       
   630 #ifdef __SYMBIAN32__
       
   631 EXPORT_C
       
   632 #endif
       
   633 
       
   634 GstMessage *
       
   635 gst_bus_timed_pop (GstBus * bus, GstClockTime timeout)
       
   636 {
       
   637   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
       
   638 
       
   639   return gst_bus_timed_pop_filtered (bus, timeout, GST_MESSAGE_ANY);
       
   640 }
       
   641 
       
   642 /**
       
   643  * gst_bus_pop_filtered:
       
   644  * @bus: a #GstBus to pop
       
   645  * @types: message types to take into account
       
   646  *
       
   647  * Get a message matching @type from the bus.  Will discard all messages on
       
   648  * the bus that do not match @type and that have been posted before the first
       
   649  * message that does match @type.  If there is no message matching @type on
       
   650  * the bus, all messages will be discarded.
       
   651  *
       
   652  * Returns: The next #GstMessage matching @type that is on the bus, or NULL if
       
   653  *     the bus is empty or there is no message matching @type.
       
   654  * The message is taken from the bus and needs to be unreffed with
       
   655  * gst_message_unref() after usage.
       
   656  *
       
   657  * MT safe.
       
   658  *
       
   659  * Since: 0.10.15
       
   660  */
       
   661 #ifdef __SYMBIAN32__
       
   662 EXPORT_C
       
   663 #endif
       
   664 
       
   665 GstMessage *
       
   666 gst_bus_pop_filtered (GstBus * bus, GstMessageType types)
       
   667 {
       
   668   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
       
   669   g_return_val_if_fail (types != 0, NULL);
       
   670 
       
   671   return gst_bus_timed_pop_filtered (bus, 0, types);
       
   672 }
       
   673 
       
   674 /**
       
   675  * gst_bus_pop:
       
   676  * @bus: a #GstBus to pop
       
   677  *
       
   678  * Get a message from the bus.
       
   679  *
       
   680  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
       
   681  * The message is taken from the bus and needs to be unreffed with
       
   682  * gst_message_unref() after usage.
       
   683  *
       
   684  * MT safe.
       
   685  */
       
   686 #ifdef __SYMBIAN32__
       
   687 EXPORT_C
       
   688 #endif
       
   689 
       
   690 GstMessage *
       
   691 gst_bus_pop (GstBus * bus)
       
   692 {
       
   693   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
       
   694 
       
   695   return gst_bus_timed_pop_filtered (bus, 0, GST_MESSAGE_ANY);
       
   696 }
       
   697 
       
   698 /**
       
   699  * gst_bus_peek:
       
   700  * @bus: a #GstBus
       
   701  *
       
   702  * Peek the message on the top of the bus' queue. The message will remain
       
   703  * on the bus' message queue. A reference is returned, and needs to be unreffed
       
   704  * by the caller.
       
   705  *
       
   706  * Returns: The #GstMessage that is on the bus, or NULL if the bus is empty.
       
   707  *
       
   708  * MT safe.
       
   709  */
       
   710 #ifdef __SYMBIAN32__
       
   711 EXPORT_C
       
   712 #endif
       
   713 
       
   714 GstMessage *
       
   715 gst_bus_peek (GstBus * bus)
       
   716 {
       
   717   GstMessage *message;
       
   718 
       
   719   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
       
   720 
       
   721   g_mutex_lock (bus->queue_lock);
       
   722   message = g_queue_peek_head (bus->queue);
       
   723   if (message)
       
   724     gst_message_ref (message);
       
   725   g_mutex_unlock (bus->queue_lock);
       
   726 
       
   727   GST_DEBUG_OBJECT (bus, "peek on bus, got message %p", message);
       
   728 
       
   729   return message;
       
   730 }
       
   731 
       
   732 /**
       
   733  * gst_bus_set_sync_handler:
       
   734  * @bus: a #GstBus to install the handler on
       
   735  * @func: The handler function to install
       
   736  * @data: User data that will be sent to the handler function.
       
   737  *
       
   738  * Sets the synchronous handler on the bus. The function will be called
       
   739  * every time a new message is posted on the bus. Note that the function
       
   740  * will be called in the same thread context as the posting object. This
       
   741  * function is usually only called by the creator of the bus. Applications
       
   742  * should handle messages asynchronously using the gst_bus watch and poll
       
   743  * functions.
       
   744  *
       
   745  * You cannot replace an existing sync_handler. You can pass NULL to this
       
   746  * function, which will clear the existing handler.
       
   747  */
       
   748 #ifdef __SYMBIAN32__
       
   749 EXPORT_C
       
   750 #endif
       
   751 
       
   752 void
       
   753 gst_bus_set_sync_handler (GstBus * bus, GstBusSyncHandler func, gpointer data)
       
   754 {
       
   755   g_return_if_fail (GST_IS_BUS (bus));
       
   756 
       
   757   GST_OBJECT_LOCK (bus);
       
   758 
       
   759   /* Assert if the user attempts to replace an existing sync_handler,
       
   760    * other than to clear it */
       
   761   if (func != NULL && bus->sync_handler != NULL)
       
   762     goto no_replace;
       
   763 
       
   764   bus->sync_handler = func;
       
   765   bus->sync_handler_data = data;
       
   766   GST_OBJECT_UNLOCK (bus);
       
   767 
       
   768   return;
       
   769 
       
   770 no_replace:
       
   771   {
       
   772     GST_OBJECT_UNLOCK (bus);
       
   773     g_warning ("cannot replace existing sync handler");
       
   774     return;
       
   775   }
       
   776 }
       
   777 
       
   778 /* GSource for the bus
       
   779  */
       
   780 typedef struct
       
   781 {
       
   782   GSource source;
       
   783   GstBus *bus;
       
   784 } GstBusSource;
       
   785 
       
   786 static gboolean
       
   787 gst_bus_source_prepare (GSource * source, gint * timeout)
       
   788 {
       
   789   GstBusSource *bsrc = (GstBusSource *) source;
       
   790 
       
   791   *timeout = -1;
       
   792   return gst_bus_have_pending (bsrc->bus);
       
   793 }
       
   794 
       
   795 static gboolean
       
   796 gst_bus_source_check (GSource * source)
       
   797 {
       
   798   GstBusSource *bsrc = (GstBusSource *) source;
       
   799 
       
   800   return gst_bus_have_pending (bsrc->bus);
       
   801 }
       
   802 
       
   803 static gboolean
       
   804 gst_bus_source_dispatch (GSource * source, GSourceFunc callback,
       
   805     gpointer user_data)
       
   806 {
       
   807   GstBusFunc handler = (GstBusFunc) callback;
       
   808   GstBusSource *bsource = (GstBusSource *) source;
       
   809   GstMessage *message;
       
   810   gboolean keep;
       
   811   GstBus *bus;
       
   812 
       
   813   g_return_val_if_fail (bsource != NULL, FALSE);
       
   814 
       
   815   bus = bsource->bus;
       
   816 
       
   817   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
       
   818 
       
   819   message = gst_bus_pop (bus);
       
   820   g_return_val_if_fail (message != NULL, FALSE);
       
   821 
       
   822   if (!handler)
       
   823     goto no_handler;
       
   824 
       
   825   GST_DEBUG_OBJECT (bus, "source %p calling dispatch with %p", source, message);
       
   826 
       
   827   keep = handler (bus, message, user_data);
       
   828   gst_message_unref (message);
       
   829 
       
   830   GST_DEBUG_OBJECT (bus, "source %p handler returns %d", source, keep);
       
   831 
       
   832   return keep;
       
   833 
       
   834 no_handler:
       
   835   {
       
   836     g_warning ("GstBus watch dispatched without callback\n"
       
   837         "You must call g_source_set_callback().");
       
   838     gst_message_unref (message);
       
   839     return FALSE;
       
   840   }
       
   841 }
       
   842 
       
   843 static void
       
   844 gst_bus_source_finalize (GSource * source)
       
   845 {
       
   846   GstBusSource *bsource = (GstBusSource *) source;
       
   847 
       
   848   gst_object_unref (bsource->bus);
       
   849   bsource->bus = NULL;
       
   850 }
       
   851 
       
   852 static GSourceFuncs gst_bus_source_funcs = {
       
   853   gst_bus_source_prepare,
       
   854   gst_bus_source_check,
       
   855   gst_bus_source_dispatch,
       
   856   gst_bus_source_finalize
       
   857 };
       
   858 
       
   859 /**
       
   860  * gst_bus_create_watch:
       
   861  * @bus: a #GstBus to create the watch for
       
   862  *
       
   863  * Create watch for this bus. The GSource will be dispatched whenever
       
   864  * a message is on the bus. After the GSource is dispatched, the
       
   865  * message is popped off the bus and unreffed.
       
   866  *
       
   867  * Returns: A #GSource that can be added to a mainloop.
       
   868  */
       
   869 #ifdef __SYMBIAN32__
       
   870 EXPORT_C
       
   871 #endif
       
   872 
       
   873 GSource *
       
   874 gst_bus_create_watch (GstBus * bus)
       
   875 {
       
   876   GstBusSource *source;
       
   877 
       
   878   g_return_val_if_fail (GST_IS_BUS (bus), NULL);
       
   879 
       
   880   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
       
   881       sizeof (GstBusSource));
       
   882   gst_object_ref (bus);
       
   883   source->bus = bus;
       
   884 
       
   885   return (GSource *) source;
       
   886 }
       
   887 
       
   888 /**
       
   889  * gst_bus_add_watch_full:
       
   890  * @bus: a #GstBus to create the watch for.
       
   891  * @priority: The priority of the watch.
       
   892  * @func: A function to call when a message is received.
       
   893  * @user_data: user data passed to @func.
       
   894  * @notify: the function to call when the source is removed.
       
   895  *
       
   896  * Adds a bus watch to the default main context with the given @priority.
       
   897  * This function is used to receive asynchronous messages in the main loop.
       
   898  *
       
   899  * When @func is called, the message belongs to the caller; if you want to
       
   900  * keep a copy of it, call gst_message_ref() before leaving @func.
       
   901  *
       
   902  * The watch can be removed using g_source_remove() or by returning FALSE
       
   903  * from @func.
       
   904  *
       
   905  * Returns: The event source id.
       
   906  *
       
   907  * MT safe.
       
   908  */
       
   909 #ifdef __SYMBIAN32__
       
   910 EXPORT_C
       
   911 #endif
       
   912 
       
   913 guint
       
   914 gst_bus_add_watch_full (GstBus * bus, gint priority,
       
   915     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
       
   916 {
       
   917   guint id;
       
   918   GSource *source;
       
   919 
       
   920   g_return_val_if_fail (GST_IS_BUS (bus), 0);
       
   921 
       
   922   source = gst_bus_create_watch (bus);
       
   923 
       
   924   if (priority != G_PRIORITY_DEFAULT)
       
   925     g_source_set_priority (source, priority);
       
   926 
       
   927   g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
       
   928 
       
   929   id = g_source_attach (source, NULL);
       
   930   g_source_unref (source);
       
   931 
       
   932   GST_DEBUG_OBJECT (bus, "New source %p", source);
       
   933   return id;
       
   934 }
       
   935 
       
   936 /**
       
   937  * gst_bus_add_watch:
       
   938  * @bus: a #GstBus to create the watch for
       
   939  * @func: A function to call when a message is received.
       
   940  * @user_data: user data passed to @func.
       
   941  *
       
   942  * Adds a bus watch to the default main context with the default priority.
       
   943  * This function is used to receive asynchronous messages in the main loop.
       
   944  *
       
   945  * The watch can be removed using g_source_remove() or by returning FALSE
       
   946  * from @func.
       
   947  *
       
   948  * Returns: The event source id.
       
   949  *
       
   950  * MT safe.
       
   951  */
       
   952 #ifdef __SYMBIAN32__
       
   953 EXPORT_C
       
   954 #endif
       
   955 
       
   956 guint
       
   957 gst_bus_add_watch (GstBus * bus, GstBusFunc func, gpointer user_data)
       
   958 {
       
   959   return gst_bus_add_watch_full (bus, G_PRIORITY_DEFAULT, func,
       
   960       user_data, NULL);
       
   961 }
       
   962 
       
   963 typedef struct
       
   964 {
       
   965   GMainLoop *loop;
       
   966   guint timeout_id;
       
   967   gboolean source_running;
       
   968   GstMessageType events;
       
   969   GstMessage *message;
       
   970 } GstBusPollData;
       
   971 
       
   972 static void
       
   973 poll_func (GstBus * bus, GstMessage * message, GstBusPollData * poll_data)
       
   974 {
       
   975   GstMessageType type;
       
   976 
       
   977   if (!g_main_loop_is_running (poll_data->loop)) {
       
   978     GST_DEBUG ("mainloop %p not running", poll_data->loop);
       
   979     return;
       
   980   }
       
   981 
       
   982   type = GST_MESSAGE_TYPE (message);
       
   983 
       
   984   if (type & poll_data->events) {
       
   985     g_return_if_fail (poll_data->message == NULL);
       
   986     /* keep ref to message */
       
   987     poll_data->message = gst_message_ref (message);
       
   988     GST_DEBUG ("mainloop %p quit", poll_data->loop);
       
   989     g_main_loop_quit (poll_data->loop);
       
   990   } else {
       
   991     GST_DEBUG ("type %08x does not match %08x", type, poll_data->events);
       
   992   }
       
   993 }
       
   994 
       
   995 static gboolean
       
   996 poll_timeout (GstBusPollData * poll_data)
       
   997 {
       
   998   GST_DEBUG ("mainloop %p quit", poll_data->loop);
       
   999   g_main_loop_quit (poll_data->loop);
       
  1000 
       
  1001   /* we don't remove the GSource as this would free our poll_data,
       
  1002    * which we still need */
       
  1003   return TRUE;
       
  1004 }
       
  1005 
       
  1006 static void
       
  1007 poll_destroy (GstBusPollData * poll_data, gpointer unused)
       
  1008 {
       
  1009   poll_data->source_running = FALSE;
       
  1010   if (!poll_data->timeout_id) {
       
  1011     g_main_loop_unref (poll_data->loop);
       
  1012     g_free (poll_data);
       
  1013   }
       
  1014 }
       
  1015 
       
  1016 static void
       
  1017 poll_destroy_timeout (GstBusPollData * poll_data)
       
  1018 {
       
  1019   poll_data->timeout_id = 0;
       
  1020   if (!poll_data->source_running) {
       
  1021     g_main_loop_unref (poll_data->loop);
       
  1022     g_free (poll_data);
       
  1023   }
       
  1024 }
       
  1025 
       
  1026 /**
       
  1027  * gst_bus_poll:
       
  1028  * @bus: a #GstBus
       
  1029  * @events: a mask of #GstMessageType, representing the set of message types to
       
  1030  * poll for.
       
  1031  * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
       
  1032  *
       
  1033  * Poll the bus for messages. Will block while waiting for messages to come.
       
  1034  * You can specify a maximum time to poll with the @timeout parameter. If
       
  1035  * @timeout is negative, this function will block indefinitely.
       
  1036  *
       
  1037  * All messages not in @events will be popped off the bus and will be ignored.
       
  1038  *
       
  1039  * Because poll is implemented using the "message" signal enabled by
       
  1040  * gst_bus_add_signal_watch(), calling gst_bus_poll() will cause the "message"
       
  1041  * signal to be emitted for every message that poll sees. Thus a "message"
       
  1042  * signal handler will see the same messages that this function sees -- neither
       
  1043  * will steal messages from the other.
       
  1044  *
       
  1045  * This function will run a main loop from the default main context when
       
  1046  * polling.
       
  1047  *
       
  1048  * Returns: The message that was received, or NULL if the poll timed out.
       
  1049  * The message is taken from the bus and needs to be unreffed with
       
  1050  * gst_message_unref() after usage.
       
  1051  */
       
  1052 #ifdef __SYMBIAN32__
       
  1053 EXPORT_C
       
  1054 #endif
       
  1055 
       
  1056 GstMessage *
       
  1057 gst_bus_poll (GstBus * bus, GstMessageType events, GstClockTimeDiff timeout)
       
  1058 {
       
  1059   GstBusPollData *poll_data;
       
  1060   GstMessage *ret;
       
  1061   gulong id;
       
  1062 
       
  1063   poll_data = g_new0 (GstBusPollData, 1);
       
  1064   poll_data->source_running = TRUE;
       
  1065   poll_data->loop = g_main_loop_new (NULL, FALSE);
       
  1066   poll_data->events = events;
       
  1067   poll_data->message = NULL;
       
  1068 
       
  1069   if (timeout >= 0)
       
  1070     poll_data->timeout_id = g_timeout_add_full (G_PRIORITY_DEFAULT_IDLE,
       
  1071         timeout / GST_MSECOND, (GSourceFunc) poll_timeout, poll_data,
       
  1072         (GDestroyNotify) poll_destroy_timeout);
       
  1073   else
       
  1074     poll_data->timeout_id = 0;
       
  1075 
       
  1076   id = g_signal_connect_data (bus, "message", G_CALLBACK (poll_func), poll_data,
       
  1077       (GClosureNotify) poll_destroy, 0);
       
  1078 
       
  1079   /* these can be nested, so it's ok */
       
  1080   gst_bus_add_signal_watch (bus);
       
  1081 
       
  1082   GST_DEBUG ("running mainloop %p", poll_data->loop);
       
  1083   g_main_loop_run (poll_data->loop);
       
  1084   GST_DEBUG ("mainloop stopped %p", poll_data->loop);
       
  1085 
       
  1086   gst_bus_remove_signal_watch (bus);
       
  1087 
       
  1088   /* holds a ref */
       
  1089   ret = poll_data->message;
       
  1090 
       
  1091   if (poll_data->timeout_id)
       
  1092     g_source_remove (poll_data->timeout_id);
       
  1093 
       
  1094   /* poll_data will be freed now */
       
  1095   g_signal_handler_disconnect (bus, id);
       
  1096 
       
  1097   GST_DEBUG_OBJECT (bus, "finished poll with message %p", ret);
       
  1098 
       
  1099   return ret;
       
  1100 }
       
  1101 
       
  1102 /**
       
  1103  * gst_bus_async_signal_func:
       
  1104  * @bus: a #GstBus
       
  1105  * @message: the #GstMessage received
       
  1106  * @data: user data
       
  1107  *
       
  1108  * A helper #GstBusFunc that can be used to convert all asynchronous messages
       
  1109  * into signals.
       
  1110  *
       
  1111  * Returns: TRUE
       
  1112  */
       
  1113 #ifdef __SYMBIAN32__
       
  1114 EXPORT_C
       
  1115 #endif
       
  1116 
       
  1117 gboolean
       
  1118 gst_bus_async_signal_func (GstBus * bus, GstMessage * message, gpointer data)
       
  1119 {
       
  1120   GQuark detail = 0;
       
  1121 
       
  1122   g_return_val_if_fail (GST_IS_BUS (bus), TRUE);
       
  1123   g_return_val_if_fail (message != NULL, TRUE);
       
  1124 
       
  1125   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
       
  1126 
       
  1127   g_signal_emit (bus, gst_bus_signals[ASYNC_MESSAGE], detail, message);
       
  1128 
       
  1129   /* we never remove this source based on signal emission return values */
       
  1130   return TRUE;
       
  1131 }
       
  1132 
       
  1133 /**
       
  1134  * gst_bus_sync_signal_handler:
       
  1135  * @bus: a #GstBus
       
  1136  * @message: the #GstMessage received
       
  1137  * @data: user data
       
  1138  *
       
  1139  * A helper GstBusSyncHandler that can be used to convert all synchronous
       
  1140  * messages into signals.
       
  1141  *
       
  1142  * Returns: GST_BUS_PASS
       
  1143  */
       
  1144 #ifdef __SYMBIAN32__
       
  1145 EXPORT_C
       
  1146 #endif
       
  1147 
       
  1148 GstBusSyncReply
       
  1149 gst_bus_sync_signal_handler (GstBus * bus, GstMessage * message, gpointer data)
       
  1150 {
       
  1151   GQuark detail = 0;
       
  1152 
       
  1153   g_return_val_if_fail (GST_IS_BUS (bus), GST_BUS_DROP);
       
  1154   g_return_val_if_fail (message != NULL, GST_BUS_DROP);
       
  1155 
       
  1156   detail = gst_message_type_to_quark (GST_MESSAGE_TYPE (message));
       
  1157 
       
  1158   g_signal_emit (bus, gst_bus_signals[SYNC_MESSAGE], detail, message);
       
  1159 
       
  1160   return GST_BUS_PASS;
       
  1161 }
       
  1162 
       
  1163 /**
       
  1164  * gst_bus_enable_sync_message_emission:
       
  1165  * @bus: a #GstBus on which you want to receive the "sync-message" signal
       
  1166  *
       
  1167  * Instructs GStreamer to emit the "sync-message" signal after running the bus's
       
  1168  * sync handler. This function is here so that code can ensure that they can
       
  1169  * synchronously receive messages without having to affect what the bin's sync
       
  1170  * handler is. 
       
  1171  *
       
  1172  * This function may be called multiple times. To clean up, the caller is
       
  1173  * responsible for calling gst_bus_disable_sync_message_emission() as many times
       
  1174  * as this function is called.
       
  1175  *
       
  1176  * While this function looks similar to gst_bus_add_signal_watch(), it is not
       
  1177  * exactly the same -- this function enables <emphasis>synchronous</emphasis> emission of
       
  1178  * signals when messages arrive; gst_bus_add_signal_watch() adds an idle callback
       
  1179  * to pop messages off the bus <emphasis>asynchronously</emphasis>. The sync-message signal
       
  1180  * comes from the thread of whatever object posted the message; the "message"
       
  1181  * signal is marshalled to the main thread via the main loop.
       
  1182  *
       
  1183  * MT safe.
       
  1184  */
       
  1185 #ifdef __SYMBIAN32__
       
  1186 EXPORT_C
       
  1187 #endif
       
  1188 
       
  1189 void
       
  1190 gst_bus_enable_sync_message_emission (GstBus * bus)
       
  1191 {
       
  1192   g_return_if_fail (GST_IS_BUS (bus));
       
  1193 
       
  1194   GST_OBJECT_LOCK (bus);
       
  1195 
       
  1196   bus->priv->num_sync_message_emitters++;
       
  1197 
       
  1198   GST_OBJECT_UNLOCK (bus);
       
  1199 }
       
  1200 
       
  1201 /**
       
  1202  * gst_bus_disable_sync_message_emission:
       
  1203  * @bus: a #GstBus on which you previously called
       
  1204  * gst_bus_enable_sync_message_emission()
       
  1205  *
       
  1206  * Instructs GStreamer to stop emitting the "sync-message" signal for this bus.
       
  1207  * See gst_bus_enable_sync_message_emission() for more information.
       
  1208  *
       
  1209  * In the event that multiple pieces of code have called
       
  1210  * gst_bus_enable_sync_message_emission(), the sync-message emissions will only
       
  1211  * be stopped after all calls to gst_bus_enable_sync_message_emission() were
       
  1212  * "cancelled" by calling this function. In this way the semantics are exactly
       
  1213  * the same as gst_object_ref() that which calls enable should also call
       
  1214  * disable.
       
  1215  *
       
  1216  * MT safe.
       
  1217  */
       
  1218 #ifdef __SYMBIAN32__
       
  1219 EXPORT_C
       
  1220 #endif
       
  1221 
       
  1222 void
       
  1223 gst_bus_disable_sync_message_emission (GstBus * bus)
       
  1224 {
       
  1225   g_return_if_fail (GST_IS_BUS (bus));
       
  1226 
       
  1227   g_return_if_fail (bus->num_signal_watchers == 0);
       
  1228 
       
  1229   GST_OBJECT_LOCK (bus);
       
  1230 
       
  1231   bus->priv->num_sync_message_emitters--;
       
  1232 
       
  1233   GST_OBJECT_UNLOCK (bus);
       
  1234 }
       
  1235 
       
  1236 /**
       
  1237  * gst_bus_add_signal_watch_full:
       
  1238  * @bus: a #GstBus on which you want to receive the "message" signal
       
  1239  * @priority: The priority of the watch.
       
  1240  *
       
  1241  * Adds a bus signal watch to the default main context with the given priority.
       
  1242  * After calling this statement, the bus will emit the "message" signal for each
       
  1243  * message posted on the bus when the main loop is running.
       
  1244  *
       
  1245  * This function may be called multiple times. To clean up, the caller is
       
  1246  * responsible for calling gst_bus_remove_signal_watch() as many times as this
       
  1247  * function is called.
       
  1248  *
       
  1249  * MT safe.
       
  1250  */
       
  1251 #ifdef __SYMBIAN32__
       
  1252 EXPORT_C
       
  1253 #endif
       
  1254 
       
  1255 void
       
  1256 gst_bus_add_signal_watch_full (GstBus * bus, gint priority)
       
  1257 {
       
  1258   g_return_if_fail (GST_IS_BUS (bus));
       
  1259 
       
  1260   /* I know the callees don't take this lock, so go ahead and abuse it */
       
  1261   GST_OBJECT_LOCK (bus);
       
  1262 
       
  1263   if (bus->num_signal_watchers > 0)
       
  1264     goto done;
       
  1265 
       
  1266   g_assert (bus->signal_watch_id == 0);
       
  1267 
       
  1268   bus->signal_watch_id =
       
  1269       gst_bus_add_watch_full (bus, priority, gst_bus_async_signal_func, NULL,
       
  1270       NULL);
       
  1271 
       
  1272 done:
       
  1273 
       
  1274   bus->num_signal_watchers++;
       
  1275 
       
  1276   GST_OBJECT_UNLOCK (bus);
       
  1277 }
       
  1278 
       
  1279 /**
       
  1280  * gst_bus_add_signal_watch:
       
  1281  * @bus: a #GstBus on which you want to receive the "message" signal
       
  1282  *
       
  1283  * Adds a bus signal watch to the default main context with the default
       
  1284  * priority.
       
  1285  * After calling this statement, the bus will emit the "message" signal for each
       
  1286  * message posted on the bus.
       
  1287  *
       
  1288  * This function may be called multiple times. To clean up, the caller is
       
  1289  * responsible for calling gst_bus_remove_signal_watch() as many times as this
       
  1290  * function is called.
       
  1291  *
       
  1292  * MT safe.
       
  1293  */
       
  1294 #ifdef __SYMBIAN32__
       
  1295 EXPORT_C
       
  1296 #endif
       
  1297 
       
  1298  
       
  1299 void
       
  1300 gst_bus_add_signal_watch (GstBus * bus)
       
  1301 {
       
  1302   gst_bus_add_signal_watch_full (bus, G_PRIORITY_DEFAULT);
       
  1303 }
       
  1304 
       
  1305 /**
       
  1306  * gst_bus_remove_signal_watch:
       
  1307  * @bus: a #GstBus you previously added a signal watch to
       
  1308  *
       
  1309  * Removes a signal watch previously added with gst_bus_add_signal_watch().
       
  1310  *
       
  1311  * MT safe.
       
  1312  */
       
  1313 #ifdef __SYMBIAN32__
       
  1314 EXPORT_C
       
  1315 #endif
       
  1316 
       
  1317 void
       
  1318 gst_bus_remove_signal_watch (GstBus * bus)
       
  1319 {
       
  1320   g_return_if_fail (GST_IS_BUS (bus));
       
  1321 
       
  1322   /* I know the callees don't take this lock, so go ahead and abuse it */
       
  1323   GST_OBJECT_LOCK (bus);
       
  1324 
       
  1325   if (bus->num_signal_watchers == 0)
       
  1326     goto error;
       
  1327 
       
  1328   bus->num_signal_watchers--;
       
  1329 
       
  1330   if (bus->num_signal_watchers > 0)
       
  1331     goto done;
       
  1332 
       
  1333   g_source_remove (bus->signal_watch_id);
       
  1334   bus->signal_watch_id = 0;
       
  1335 
       
  1336 done:
       
  1337   GST_OBJECT_UNLOCK (bus);
       
  1338   return;
       
  1339 
       
  1340 error:
       
  1341   {
       
  1342     g_critical ("Bus %s has no signal watches attached", GST_OBJECT_NAME (bus));
       
  1343     GST_OBJECT_UNLOCK (bus);
       
  1344     return;
       
  1345   }
       
  1346 }