diff -r 71e347f905f2 -r 4a7fac7dd34a gstreamer_core/gst/gstbus.c --- a/gstreamer_core/gst/gstbus.c Fri Mar 19 09:35:09 2010 +0200 +++ b/gstreamer_core/gst/gstbus.c Fri Apr 16 15:15:52 2010 +0300 @@ -95,51 +95,20 @@ static void gst_bus_init (GstBus * bus); static void gst_bus_dispose (GObject * object); -static void gst_bus_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_bus_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); +static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx); static GstObjectClass *parent_class = NULL; static guint gst_bus_signals[LAST_SIGNAL] = { 0 }; -/* the context we wakeup when we posted a message on the bus */ -static GMainContext *main_context; - struct _GstBusPrivate { guint num_sync_message_emitters; - GCond *queue_cond; + GSource *watch_id; + GMainContext *main_context; }; -#ifdef __SYMBIAN32__ -EXPORT_C -#endif - - -GType -gst_bus_get_type (void) -{ - static GType bus_type = 0; - if (G_UNLIKELY (bus_type == 0)) { - static const GTypeInfo bus_info = { - sizeof (GstBusClass), - NULL, - NULL, - (GClassInitFunc) gst_bus_class_init, - NULL, - NULL, - sizeof (GstBus), - 0, - (GInstanceInitFunc) gst_bus_init, - NULL - }; - - bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0); - } - return bus_type; -} +G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT); /* fixme: do something about this */ static void @@ -163,8 +132,8 @@ data2 = closure->data; } callback = - (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : cc-> - callback); + (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : + cc->callback); callback (data1, gst_value_get_mini_object (param_values + 1), data2); } @@ -172,20 +141,11 @@ static void gst_bus_class_init (GstBusClass * klass) { - GObjectClass *gobject_class; - GstObjectClass *gstobject_class; - - gobject_class = (GObjectClass *) klass; - gstobject_class = (GstObjectClass *) klass; + GObjectClass *gobject_class = (GObjectClass *) klass; parent_class = g_type_class_peek_parent (klass); - if (!g_thread_supported ()) - g_thread_init (NULL); - gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose); - gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property); /** * GstBus::sync-message: @@ -223,8 +183,6 @@ G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL, marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE); - main_context = g_main_context_default (); - g_type_class_add_private (klass, sizeof (GstBusPrivate)); } @@ -243,9 +201,7 @@ static void gst_bus_dispose (GObject * object) { - GstBus *bus; - - bus = GST_BUS (object); + GstBus *bus = GST_BUS (object); if (bus->queue) { GstMessage *message; @@ -265,37 +221,48 @@ bus->priv->queue_cond = NULL; } + if (bus->priv->main_context) { + g_main_context_unref (bus->priv->main_context); + bus->priv->main_context = NULL; + } + G_OBJECT_CLASS (parent_class)->dispose (object); } static void -gst_bus_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) +gst_bus_wakeup_main_context (GstBus * bus) { - GstBus *bus; + GMainContext *ctx; - bus = GST_BUS (object); + GST_OBJECT_LOCK (bus); + if ((ctx = bus->priv->main_context)) + g_main_context_ref (ctx); + GST_OBJECT_UNLOCK (bus); - switch (prop_id) { - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } + g_main_context_wakeup (ctx); + + if (ctx) + g_main_context_unref (ctx); } static void -gst_bus_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) +gst_bus_set_main_context (GstBus * bus, GMainContext * ctx) { - GstBus *bus; + GST_OBJECT_LOCK (bus); + + if (bus->priv->main_context != NULL) { + g_main_context_unref (bus->priv->main_context); + bus->priv->main_context = NULL; + } - bus = GST_BUS (object); + if (ctx != NULL) { + bus->priv->main_context = g_main_context_ref (ctx); + } - switch (prop_id) { - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } + GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p", + ctx, g_main_context_default ()); + + GST_OBJECT_UNLOCK (bus); } /** @@ -387,8 +354,7 @@ g_mutex_unlock (bus->queue_lock); GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message); - /* FIXME cannot assume sources are only in the default context */ - g_main_context_wakeup (main_context); + gst_bus_wakeup_main_context (bus); break; case GST_BUS_ASYNC: @@ -412,8 +378,7 @@ g_cond_broadcast (bus->priv->queue_cond); g_mutex_unlock (bus->queue_lock); - /* FIXME cannot assume sources are only in the default context */ - g_main_context_wakeup (main_context); + gst_bus_wakeup_main_context (bus); /* now block till the message is freed */ g_cond_wait (cond, lock); @@ -781,6 +746,7 @@ { GSource source; GstBus *bus; + gboolean inited; } GstBusSource; static gboolean @@ -788,6 +754,14 @@ { GstBusSource *bsrc = (GstBusSource *) source; + /* we do this here now that we know that we're attached to a main context + * (we don't support detaching a source from a main context and then + * re-attaching it to a different main context) */ + if (G_UNLIKELY (!bsrc->inited)) { + gst_bus_set_main_context (bsrc->bus, g_source_get_context (source)); + bsrc->inited = TRUE; + } + *timeout = -1; return gst_bus_have_pending (bsrc->bus); } @@ -817,7 +791,11 @@ g_return_val_if_fail (GST_IS_BUS (bus), FALSE); message = gst_bus_pop (bus); - g_return_val_if_fail (message != NULL, FALSE); + + /* The message queue might be empty if some other thread or callback set + * the bus to flushing between check/prepare and dispatch */ + if (G_UNLIKELY (message == NULL)) + return TRUE; if (!handler) goto no_handler; @@ -844,7 +822,18 @@ gst_bus_source_finalize (GSource * source) { GstBusSource *bsource = (GstBusSource *) source; + GstBus *bus; + bus = bsource->bus; + + GST_DEBUG_OBJECT (bus, "finalize source %p", source); + + GST_OBJECT_LOCK (bus); + if (bus->priv->watch_id == source) + bus->priv->watch_id = NULL; + GST_OBJECT_UNLOCK (bus); + + gst_bus_set_main_context (bsource->bus, NULL); gst_object_unref (bsource->bus); bsource->bus = NULL; } @@ -879,12 +868,44 @@ source = (GstBusSource *) g_source_new (&gst_bus_source_funcs, sizeof (GstBusSource)); - gst_object_ref (bus); - source->bus = bus; + source->bus = gst_object_ref (bus); + source->inited = FALSE; return (GSource *) source; } +/* must be called with the bus OBJECT LOCK */ +static guint +gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority, + GstBusFunc func, gpointer user_data, GDestroyNotify notify) +{ + guint id; + GSource *source; + + if (bus->priv->watch_id) { + GST_ERROR_OBJECT (bus, + "Tried to add new watch while one was already there"); + return 0; + } + + source = gst_bus_create_watch (bus); + + if (priority != G_PRIORITY_DEFAULT) + g_source_set_priority (source, priority); + + g_source_set_callback (source, (GSourceFunc) func, user_data, notify); + + id = g_source_attach (source, NULL); + g_source_unref (source); + + if (id) { + bus->priv->watch_id = source; + } + + GST_DEBUG_OBJECT (bus, "New source %p with id %u", source, id); + return id; +} + /** * gst_bus_add_watch_full: * @bus: a #GstBus to create the watch for. @@ -895,6 +916,8 @@ * * Adds a bus watch to the default main context with the given @priority. * This function is used to receive asynchronous messages in the main loop. + * There can only be a single bus watch per bus, you must remove it before you + * can set a new one. * * When @func is called, the message belongs to the caller; if you want to * keep a copy of it, call gst_message_ref() before leaving @func. @@ -915,21 +938,13 @@ GstBusFunc func, gpointer user_data, GDestroyNotify notify) { guint id; - GSource *source; g_return_val_if_fail (GST_IS_BUS (bus), 0); - source = gst_bus_create_watch (bus); - - if (priority != G_PRIORITY_DEFAULT) - g_source_set_priority (source, priority); + GST_OBJECT_LOCK (bus); + id = gst_bus_add_watch_full_unlocked (bus, priority, func, user_data, notify); + GST_OBJECT_UNLOCK (bus); - g_source_set_callback (source, (GSourceFunc) func, user_data, notify); - - id = g_source_attach (source, NULL); - g_source_unref (source); - - GST_DEBUG_OBJECT (bus, "New source %p", source); return id; } @@ -941,6 +956,8 @@ * * Adds a bus watch to the default main context with the default priority. * This function is used to receive asynchronous messages in the main loop. + * There can only be a single bus watch per bus, you must remove it before you + * can set a new one. * * The watch can be removed using g_source_remove() or by returning FALSE * from @func. @@ -1028,7 +1045,8 @@ * @bus: a #GstBus * @events: a mask of #GstMessageType, representing the set of message types to * poll for. - * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely. + * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll + * indefinitely. * * Poll the bus for messages. Will block while waiting for messages to come. * You can specify a maximum time to poll with the @timeout parameter. If @@ -1045,6 +1063,23 @@ * This function will run a main loop from the default main context when * polling. * + * You should never use this function, since it is pure evil. This is + * especially true for GUI applications based on Gtk+ or Qt, but also for any + * other non-trivial application that uses the GLib main loop. As this function + * runs a GLib main loop, any callback attached to the default GLib main + * context may be invoked. This could be timeouts, GUI events, I/O events etc.; + * even if gst_bus_poll() is called with a 0 timeout. Any of these callbacks + * may do things you do not expect, e.g. destroy the main application window or + * some other resource; change other application state; display a dialog and + * run another main loop until the user clicks it away. In short, using this + * function may add a lot of complexity to your code through unexpected + * re-entrancy and unexpected changes to your application's state. + * + * For 0 timeouts use gst_bus_pop_filtered() instead of this function; for + * other short timeouts use gst_bus_timed_pop_filtered(); everything else is + * better handled by setting up an asynchronous bus watch and doing things + * from there. + * * Returns: The message that was received, or NULL if the poll timed out. * The message is taken from the bus and needs to be unreffed with * gst_message_unref() after usage. @@ -1192,9 +1227,7 @@ g_return_if_fail (GST_IS_BUS (bus)); GST_OBJECT_LOCK (bus); - bus->priv->num_sync_message_emitters++; - GST_OBJECT_UNLOCK (bus); } @@ -1223,13 +1256,10 @@ gst_bus_disable_sync_message_emission (GstBus * bus) { g_return_if_fail (GST_IS_BUS (bus)); - g_return_if_fail (bus->num_signal_watchers == 0); GST_OBJECT_LOCK (bus); - bus->priv->num_sync_message_emitters--; - GST_OBJECT_UNLOCK (bus); } @@ -1246,6 +1276,9 @@ * responsible for calling gst_bus_remove_signal_watch() as many times as this * function is called. * + * There can only be a single bus watch per bus, you most remove all signal watch + * before you can set another type of watch. + * * MT safe. */ #ifdef __SYMBIAN32__ @@ -1263,17 +1296,30 @@ if (bus->num_signal_watchers > 0) goto done; + /* this should not fail because the counter above takes care of it */ g_assert (bus->signal_watch_id == 0); bus->signal_watch_id = - gst_bus_add_watch_full (bus, priority, gst_bus_async_signal_func, NULL, - NULL); + gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func, + NULL, NULL); + + if (G_UNLIKELY (bus->signal_watch_id == 0)) + goto add_failed; done: bus->num_signal_watchers++; GST_OBJECT_UNLOCK (bus); + return; + + /* ERRORS */ +add_failed: + { + g_critical ("Could not add signal watch to bus %s", GST_OBJECT_NAME (bus)); + GST_OBJECT_UNLOCK (bus); + return; + } } /** @@ -1295,7 +1341,6 @@ EXPORT_C #endif - void gst_bus_add_signal_watch (GstBus * bus) { @@ -1317,6 +1362,8 @@ void gst_bus_remove_signal_watch (GstBus * bus) { + guint id = 0; + g_return_if_fail (GST_IS_BUS (bus)); /* I know the callees don't take this lock, so go ahead and abuse it */ @@ -1330,13 +1377,20 @@ if (bus->num_signal_watchers > 0) goto done; - g_source_remove (bus->signal_watch_id); + id = bus->signal_watch_id; bus->signal_watch_id = 0; + GST_DEBUG_OBJECT (bus, "removing signal watch %u", id); + done: GST_OBJECT_UNLOCK (bus); + + if (id) + g_source_remove (id); + return; + /* ERRORS */ error: { g_critical ("Bus %s has no signal watches attached", GST_OBJECT_NAME (bus));