gstreamer_core/gst/gstbus.c
branchRCL_3
changeset 29 567bb019e3e3
parent 0 0e761a78d257
child 30 7e817e7e631c
--- a/gstreamer_core/gst/gstbus.c	Wed Mar 31 22:03:18 2010 +0300
+++ b/gstreamer_core/gst/gstbus.c	Tue Aug 31 15:30:33 2010 +0300
@@ -95,51 +95,20 @@
 static void gst_bus_init (GstBus * bus);
 static void gst_bus_dispose (GObject * object);
 
-static void gst_bus_set_property (GObject * object, guint prop_id,
-    const GValue * value, GParamSpec * pspec);
-static void gst_bus_get_property (GObject * object, guint prop_id,
-    GValue * value, GParamSpec * pspec);
+static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
 
 static GstObjectClass *parent_class = NULL;
 static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
 
-/* the context we wakeup when we posted a message on the bus */
-static GMainContext *main_context;
-
 struct _GstBusPrivate
 {
   guint num_sync_message_emitters;
-
   GCond *queue_cond;
+  GSource *watch_id;
+  GMainContext *main_context;
 };
-#ifdef __SYMBIAN32__
-EXPORT_C
-#endif
-
-
-GType
-gst_bus_get_type (void)
-{
-  static GType bus_type = 0;
 
-  if (G_UNLIKELY (bus_type == 0)) {
-    static const GTypeInfo bus_info = {
-      sizeof (GstBusClass),
-      NULL,
-      NULL,
-      (GClassInitFunc) gst_bus_class_init,
-      NULL,
-      NULL,
-      sizeof (GstBus),
-      0,
-      (GInstanceInitFunc) gst_bus_init,
-      NULL
-    };
-
-    bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
-  }
-  return bus_type;
-}
+G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
 
 /* fixme: do something about this */
 static void
@@ -163,8 +132,8 @@
     data2 = closure->data;
   }
   callback =
-      (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : cc->
-      callback);
+      (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data :
+      cc->callback);
 
   callback (data1, gst_value_get_mini_object (param_values + 1), data2);
 }
@@ -172,20 +141,11 @@
 static void
 gst_bus_class_init (GstBusClass * klass)
 {
-  GObjectClass *gobject_class;
-  GstObjectClass *gstobject_class;
-
-  gobject_class = (GObjectClass *) klass;
-  gstobject_class = (GstObjectClass *) klass;
+  GObjectClass *gobject_class = (GObjectClass *) klass;
 
   parent_class = g_type_class_peek_parent (klass);
 
-  if (!g_thread_supported ())
-    g_thread_init (NULL);
-
   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
-  gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
-  gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
 
   /**
    * GstBus::sync-message:
@@ -223,8 +183,6 @@
       G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
       marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
 
-  main_context = g_main_context_default ();
-
   g_type_class_add_private (klass, sizeof (GstBusPrivate));
 }
 
@@ -243,9 +201,7 @@
 static void
 gst_bus_dispose (GObject * object)
 {
-  GstBus *bus;
-
-  bus = GST_BUS (object);
+  GstBus *bus = GST_BUS (object);
 
   if (bus->queue) {
     GstMessage *message;
@@ -265,37 +221,48 @@
     bus->priv->queue_cond = NULL;
   }
 
+  if (bus->priv->main_context) {
+    g_main_context_unref (bus->priv->main_context);
+    bus->priv->main_context = NULL;
+  }
+
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
 
 static void
-gst_bus_set_property (GObject * object, guint prop_id,
-    const GValue * value, GParamSpec * pspec)
+gst_bus_wakeup_main_context (GstBus * bus)
 {
-  GstBus *bus;
+  GMainContext *ctx;
 
-  bus = GST_BUS (object);
+  GST_OBJECT_LOCK (bus);
+  if ((ctx = bus->priv->main_context))
+    g_main_context_ref (ctx);
+  GST_OBJECT_UNLOCK (bus);
 
-  switch (prop_id) {
-    default:
-      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
-      break;
-  }
+  g_main_context_wakeup (ctx);
+
+  if (ctx)
+    g_main_context_unref (ctx);
 }
 
 static void
-gst_bus_get_property (GObject * object, guint prop_id,
-    GValue * value, GParamSpec * pspec)
+gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
 {
-  GstBus *bus;
+  GST_OBJECT_LOCK (bus);
+
+  if (bus->priv->main_context != NULL) {
+    g_main_context_unref (bus->priv->main_context);
+    bus->priv->main_context = NULL;
+  }
 
-  bus = GST_BUS (object);
+  if (ctx != NULL) {
+    bus->priv->main_context = g_main_context_ref (ctx);
+  }
 
-  switch (prop_id) {
-    default:
-      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
-      break;
-  }
+  GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
+      ctx, g_main_context_default ());
+
+  GST_OBJECT_UNLOCK (bus);
 }
 
 /**
@@ -387,8 +354,7 @@
       g_mutex_unlock (bus->queue_lock);
       GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
 
-      /* FIXME cannot assume sources are only in the default context */
-      g_main_context_wakeup (main_context);
+      gst_bus_wakeup_main_context (bus);
 
       break;
     case GST_BUS_ASYNC:
@@ -412,8 +378,7 @@
       g_cond_broadcast (bus->priv->queue_cond);
       g_mutex_unlock (bus->queue_lock);
 
-      /* FIXME cannot assume sources are only in the default context */
-      g_main_context_wakeup (main_context);
+      gst_bus_wakeup_main_context (bus);
 
       /* now block till the message is freed */
       g_cond_wait (cond, lock);
@@ -781,6 +746,7 @@
 {
   GSource source;
   GstBus *bus;
+  gboolean inited;
 } GstBusSource;
 
 static gboolean
@@ -788,6 +754,14 @@
 {
   GstBusSource *bsrc = (GstBusSource *) source;
 
+  /* we do this here now that we know that we're attached to a main context
+   * (we don't support detaching a source from a main context and then
+   * re-attaching it to a different main context) */
+  if (G_UNLIKELY (!bsrc->inited)) {
+    gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
+    bsrc->inited = TRUE;
+  }
+
   *timeout = -1;
   return gst_bus_have_pending (bsrc->bus);
 }
@@ -817,7 +791,11 @@
   g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
 
   message = gst_bus_pop (bus);
-  g_return_val_if_fail (message != NULL, FALSE);
+
+  /* The message queue might be empty if some other thread or callback set
+   * the bus to flushing between check/prepare and dispatch */
+  if (G_UNLIKELY (message == NULL))
+    return TRUE;
 
   if (!handler)
     goto no_handler;
@@ -844,7 +822,18 @@
 gst_bus_source_finalize (GSource * source)
 {
   GstBusSource *bsource = (GstBusSource *) source;
+  GstBus *bus;
 
+  bus = bsource->bus;
+
+  GST_DEBUG_OBJECT (bus, "finalize source %p", source);
+
+  GST_OBJECT_LOCK (bus);
+  if (bus->priv->watch_id == source)
+    bus->priv->watch_id = NULL;
+  GST_OBJECT_UNLOCK (bus);
+
+  gst_bus_set_main_context (bsource->bus, NULL);
   gst_object_unref (bsource->bus);
   bsource->bus = NULL;
 }
@@ -879,12 +868,44 @@
 
   source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
       sizeof (GstBusSource));
-  gst_object_ref (bus);
-  source->bus = bus;
+  source->bus = gst_object_ref (bus);
+  source->inited = FALSE;
 
   return (GSource *) source;
 }
 
+/* must be called with the bus OBJECT LOCK */
+static guint
+gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
+    GstBusFunc func, gpointer user_data, GDestroyNotify notify)
+{
+  guint id;
+  GSource *source;
+
+  if (bus->priv->watch_id) {
+    GST_ERROR_OBJECT (bus,
+        "Tried to add new watch while one was already there");
+    return 0;
+  }
+
+  source = gst_bus_create_watch (bus);
+
+  if (priority != G_PRIORITY_DEFAULT)
+    g_source_set_priority (source, priority);
+
+  g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
+
+  id = g_source_attach (source, NULL);
+  g_source_unref (source);
+
+  if (id) {
+    bus->priv->watch_id = source;
+  }
+
+  GST_DEBUG_OBJECT (bus, "New source %p with id %u", source, id);
+  return id;
+}
+
 /**
  * gst_bus_add_watch_full:
  * @bus: a #GstBus to create the watch for.
@@ -895,6 +916,8 @@
  *
  * Adds a bus watch to the default main context with the given @priority.
  * This function is used to receive asynchronous messages in the main loop.
+ * There can only be a single bus watch per bus, you must remove it before you
+ * can set a new one.
  *
  * When @func is called, the message belongs to the caller; if you want to
  * keep a copy of it, call gst_message_ref() before leaving @func.
@@ -915,21 +938,13 @@
     GstBusFunc func, gpointer user_data, GDestroyNotify notify)
 {
   guint id;
-  GSource *source;
 
   g_return_val_if_fail (GST_IS_BUS (bus), 0);
 
-  source = gst_bus_create_watch (bus);
-
-  if (priority != G_PRIORITY_DEFAULT)
-    g_source_set_priority (source, priority);
+  GST_OBJECT_LOCK (bus);
+  id = gst_bus_add_watch_full_unlocked (bus, priority, func, user_data, notify);
+  GST_OBJECT_UNLOCK (bus);
 
-  g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
-
-  id = g_source_attach (source, NULL);
-  g_source_unref (source);
-
-  GST_DEBUG_OBJECT (bus, "New source %p", source);
   return id;
 }
 
@@ -941,6 +956,8 @@
  *
  * Adds a bus watch to the default main context with the default priority.
  * This function is used to receive asynchronous messages in the main loop.
+ * There can only be a single bus watch per bus, you must remove it before you
+ * can set a new one.
  *
  * The watch can be removed using g_source_remove() or by returning FALSE
  * from @func.
@@ -1028,7 +1045,8 @@
  * @bus: a #GstBus
  * @events: a mask of #GstMessageType, representing the set of message types to
  * poll for.
- * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
+ * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll
+ * indefinitely.
  *
  * Poll the bus for messages. Will block while waiting for messages to come.
  * You can specify a maximum time to poll with the @timeout parameter. If
@@ -1045,6 +1063,23 @@
  * This function will run a main loop from the default main context when
  * polling.
  *
+ * You should never use this function, since it is pure evil. This is
+ * especially true for GUI applications based on Gtk+ or Qt, but also for any
+ * other non-trivial application that uses the GLib main loop. As this function
+ * runs a GLib main loop, any callback attached to the default GLib main
+ * context may be invoked. This could be timeouts, GUI events, I/O events etc.;
+ * even if gst_bus_poll() is called with a 0 timeout. Any of these callbacks
+ * may do things you do not expect, e.g. destroy the main application window or
+ * some other resource; change other application state; display a dialog and
+ * run another main loop until the user clicks it away. In short, using this
+ * function may add a lot of complexity to your code through unexpected
+ * re-entrancy and unexpected changes to your application's state.
+ *
+ * For 0 timeouts use gst_bus_pop_filtered() instead of this function; for
+ * other short timeouts use gst_bus_timed_pop_filtered(); everything else is
+ * better handled by setting up an asynchronous bus watch and doing things
+ * from there.
+ *
  * Returns: The message that was received, or NULL if the poll timed out.
  * The message is taken from the bus and needs to be unreffed with
  * gst_message_unref() after usage.
@@ -1192,9 +1227,7 @@
   g_return_if_fail (GST_IS_BUS (bus));
 
   GST_OBJECT_LOCK (bus);
-
   bus->priv->num_sync_message_emitters++;
-
   GST_OBJECT_UNLOCK (bus);
 }
 
@@ -1223,13 +1256,10 @@
 gst_bus_disable_sync_message_emission (GstBus * bus)
 {
   g_return_if_fail (GST_IS_BUS (bus));
-
   g_return_if_fail (bus->num_signal_watchers == 0);
 
   GST_OBJECT_LOCK (bus);
-
   bus->priv->num_sync_message_emitters--;
-
   GST_OBJECT_UNLOCK (bus);
 }
 
@@ -1246,6 +1276,9 @@
  * responsible for calling gst_bus_remove_signal_watch() as many times as this
  * function is called.
  *
+ * There can only be a single bus watch per bus, you most remove all signal watch
+ * before you can set another type of watch.
+ *
  * MT safe.
  */
 #ifdef __SYMBIAN32__
@@ -1263,17 +1296,30 @@
   if (bus->num_signal_watchers > 0)
     goto done;
 
+  /* this should not fail because the counter above takes care of it */
   g_assert (bus->signal_watch_id == 0);
 
   bus->signal_watch_id =
-      gst_bus_add_watch_full (bus, priority, gst_bus_async_signal_func, NULL,
-      NULL);
+      gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
+      NULL, NULL);
+
+  if (G_UNLIKELY (bus->signal_watch_id == 0))
+    goto add_failed;
 
 done:
 
   bus->num_signal_watchers++;
 
   GST_OBJECT_UNLOCK (bus);
+  return;
+
+  /* ERRORS */
+add_failed:
+  {
+    g_critical ("Could not add signal watch to bus %s", GST_OBJECT_NAME (bus));
+    GST_OBJECT_UNLOCK (bus);
+    return;
+  }
 }
 
 /**
@@ -1295,7 +1341,6 @@
 EXPORT_C
 #endif
 
- 
 void
 gst_bus_add_signal_watch (GstBus * bus)
 {
@@ -1317,6 +1362,8 @@
 void
 gst_bus_remove_signal_watch (GstBus * bus)
 {
+  guint id = 0;
+
   g_return_if_fail (GST_IS_BUS (bus));
 
   /* I know the callees don't take this lock, so go ahead and abuse it */
@@ -1330,13 +1377,20 @@
   if (bus->num_signal_watchers > 0)
     goto done;
 
-  g_source_remove (bus->signal_watch_id);
+  id = bus->signal_watch_id;
   bus->signal_watch_id = 0;
 
+  GST_DEBUG_OBJECT (bus, "removing signal watch %u", id);
+
 done:
   GST_OBJECT_UNLOCK (bus);
+
+  if (id)
+    g_source_remove (id);
+
   return;
 
+  /* ERRORS */
 error:
   {
     g_critical ("Bus %s has no signal watches attached", GST_OBJECT_NAME (bus));