--- a/gstreamer_core/gst/gstbus.c Wed Mar 31 22:03:18 2010 +0300
+++ b/gstreamer_core/gst/gstbus.c Tue Aug 31 15:30:33 2010 +0300
@@ -95,51 +95,20 @@
static void gst_bus_init (GstBus * bus);
static void gst_bus_dispose (GObject * object);
-static void gst_bus_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec);
-static void gst_bus_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec);
+static void gst_bus_set_main_context (GstBus * bus, GMainContext * ctx);
static GstObjectClass *parent_class = NULL;
static guint gst_bus_signals[LAST_SIGNAL] = { 0 };
-/* the context we wakeup when we posted a message on the bus */
-static GMainContext *main_context;
-
struct _GstBusPrivate
{
guint num_sync_message_emitters;
-
GCond *queue_cond;
+ GSource *watch_id;
+ GMainContext *main_context;
};
-#ifdef __SYMBIAN32__
-EXPORT_C
-#endif
-
-
-GType
-gst_bus_get_type (void)
-{
- static GType bus_type = 0;
- if (G_UNLIKELY (bus_type == 0)) {
- static const GTypeInfo bus_info = {
- sizeof (GstBusClass),
- NULL,
- NULL,
- (GClassInitFunc) gst_bus_class_init,
- NULL,
- NULL,
- sizeof (GstBus),
- 0,
- (GInstanceInitFunc) gst_bus_init,
- NULL
- };
-
- bus_type = g_type_register_static (GST_TYPE_OBJECT, "GstBus", &bus_info, 0);
- }
- return bus_type;
-}
+G_DEFINE_TYPE (GstBus, gst_bus, GST_TYPE_OBJECT);
/* fixme: do something about this */
static void
@@ -163,8 +132,8 @@
data2 = closure->data;
}
callback =
- (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data : cc->
- callback);
+ (marshalfunc_VOID__MINIOBJECT) (marshal_data ? marshal_data :
+ cc->callback);
callback (data1, gst_value_get_mini_object (param_values + 1), data2);
}
@@ -172,20 +141,11 @@
static void
gst_bus_class_init (GstBusClass * klass)
{
- GObjectClass *gobject_class;
- GstObjectClass *gstobject_class;
-
- gobject_class = (GObjectClass *) klass;
- gstobject_class = (GstObjectClass *) klass;
+ GObjectClass *gobject_class = (GObjectClass *) klass;
parent_class = g_type_class_peek_parent (klass);
- if (!g_thread_supported ())
- g_thread_init (NULL);
-
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_bus_dispose);
- gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_bus_set_property);
- gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_bus_get_property);
/**
* GstBus::sync-message:
@@ -223,8 +183,6 @@
G_STRUCT_OFFSET (GstBusClass, message), NULL, NULL,
marshal_VOID__MINIOBJECT, G_TYPE_NONE, 1, GST_TYPE_MESSAGE);
- main_context = g_main_context_default ();
-
g_type_class_add_private (klass, sizeof (GstBusPrivate));
}
@@ -243,9 +201,7 @@
static void
gst_bus_dispose (GObject * object)
{
- GstBus *bus;
-
- bus = GST_BUS (object);
+ GstBus *bus = GST_BUS (object);
if (bus->queue) {
GstMessage *message;
@@ -265,37 +221,48 @@
bus->priv->queue_cond = NULL;
}
+ if (bus->priv->main_context) {
+ g_main_context_unref (bus->priv->main_context);
+ bus->priv->main_context = NULL;
+ }
+
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
-gst_bus_set_property (GObject * object, guint prop_id,
- const GValue * value, GParamSpec * pspec)
+gst_bus_wakeup_main_context (GstBus * bus)
{
- GstBus *bus;
+ GMainContext *ctx;
- bus = GST_BUS (object);
+ GST_OBJECT_LOCK (bus);
+ if ((ctx = bus->priv->main_context))
+ g_main_context_ref (ctx);
+ GST_OBJECT_UNLOCK (bus);
- switch (prop_id) {
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
+ g_main_context_wakeup (ctx);
+
+ if (ctx)
+ g_main_context_unref (ctx);
}
static void
-gst_bus_get_property (GObject * object, guint prop_id,
- GValue * value, GParamSpec * pspec)
+gst_bus_set_main_context (GstBus * bus, GMainContext * ctx)
{
- GstBus *bus;
+ GST_OBJECT_LOCK (bus);
+
+ if (bus->priv->main_context != NULL) {
+ g_main_context_unref (bus->priv->main_context);
+ bus->priv->main_context = NULL;
+ }
- bus = GST_BUS (object);
+ if (ctx != NULL) {
+ bus->priv->main_context = g_main_context_ref (ctx);
+ }
- switch (prop_id) {
- default:
- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
- break;
- }
+ GST_DEBUG_OBJECT (bus, "setting main context to %p, GLib default context: %p",
+ ctx, g_main_context_default ());
+
+ GST_OBJECT_UNLOCK (bus);
}
/**
@@ -387,8 +354,7 @@
g_mutex_unlock (bus->queue_lock);
GST_DEBUG_OBJECT (bus, "[msg %p] pushed on async queue", message);
- /* FIXME cannot assume sources are only in the default context */
- g_main_context_wakeup (main_context);
+ gst_bus_wakeup_main_context (bus);
break;
case GST_BUS_ASYNC:
@@ -412,8 +378,7 @@
g_cond_broadcast (bus->priv->queue_cond);
g_mutex_unlock (bus->queue_lock);
- /* FIXME cannot assume sources are only in the default context */
- g_main_context_wakeup (main_context);
+ gst_bus_wakeup_main_context (bus);
/* now block till the message is freed */
g_cond_wait (cond, lock);
@@ -781,6 +746,7 @@
{
GSource source;
GstBus *bus;
+ gboolean inited;
} GstBusSource;
static gboolean
@@ -788,6 +754,14 @@
{
GstBusSource *bsrc = (GstBusSource *) source;
+ /* we do this here now that we know that we're attached to a main context
+ * (we don't support detaching a source from a main context and then
+ * re-attaching it to a different main context) */
+ if (G_UNLIKELY (!bsrc->inited)) {
+ gst_bus_set_main_context (bsrc->bus, g_source_get_context (source));
+ bsrc->inited = TRUE;
+ }
+
*timeout = -1;
return gst_bus_have_pending (bsrc->bus);
}
@@ -817,7 +791,11 @@
g_return_val_if_fail (GST_IS_BUS (bus), FALSE);
message = gst_bus_pop (bus);
- g_return_val_if_fail (message != NULL, FALSE);
+
+ /* The message queue might be empty if some other thread or callback set
+ * the bus to flushing between check/prepare and dispatch */
+ if (G_UNLIKELY (message == NULL))
+ return TRUE;
if (!handler)
goto no_handler;
@@ -844,7 +822,18 @@
gst_bus_source_finalize (GSource * source)
{
GstBusSource *bsource = (GstBusSource *) source;
+ GstBus *bus;
+ bus = bsource->bus;
+
+ GST_DEBUG_OBJECT (bus, "finalize source %p", source);
+
+ GST_OBJECT_LOCK (bus);
+ if (bus->priv->watch_id == source)
+ bus->priv->watch_id = NULL;
+ GST_OBJECT_UNLOCK (bus);
+
+ gst_bus_set_main_context (bsource->bus, NULL);
gst_object_unref (bsource->bus);
bsource->bus = NULL;
}
@@ -879,12 +868,44 @@
source = (GstBusSource *) g_source_new (&gst_bus_source_funcs,
sizeof (GstBusSource));
- gst_object_ref (bus);
- source->bus = bus;
+ source->bus = gst_object_ref (bus);
+ source->inited = FALSE;
return (GSource *) source;
}
+/* must be called with the bus OBJECT LOCK */
+static guint
+gst_bus_add_watch_full_unlocked (GstBus * bus, gint priority,
+ GstBusFunc func, gpointer user_data, GDestroyNotify notify)
+{
+ guint id;
+ GSource *source;
+
+ if (bus->priv->watch_id) {
+ GST_ERROR_OBJECT (bus,
+ "Tried to add new watch while one was already there");
+ return 0;
+ }
+
+ source = gst_bus_create_watch (bus);
+
+ if (priority != G_PRIORITY_DEFAULT)
+ g_source_set_priority (source, priority);
+
+ g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
+
+ id = g_source_attach (source, NULL);
+ g_source_unref (source);
+
+ if (id) {
+ bus->priv->watch_id = source;
+ }
+
+ GST_DEBUG_OBJECT (bus, "New source %p with id %u", source, id);
+ return id;
+}
+
/**
* gst_bus_add_watch_full:
* @bus: a #GstBus to create the watch for.
@@ -895,6 +916,8 @@
*
* Adds a bus watch to the default main context with the given @priority.
* This function is used to receive asynchronous messages in the main loop.
+ * There can only be a single bus watch per bus, you must remove it before you
+ * can set a new one.
*
* When @func is called, the message belongs to the caller; if you want to
* keep a copy of it, call gst_message_ref() before leaving @func.
@@ -915,21 +938,13 @@
GstBusFunc func, gpointer user_data, GDestroyNotify notify)
{
guint id;
- GSource *source;
g_return_val_if_fail (GST_IS_BUS (bus), 0);
- source = gst_bus_create_watch (bus);
-
- if (priority != G_PRIORITY_DEFAULT)
- g_source_set_priority (source, priority);
+ GST_OBJECT_LOCK (bus);
+ id = gst_bus_add_watch_full_unlocked (bus, priority, func, user_data, notify);
+ GST_OBJECT_UNLOCK (bus);
- g_source_set_callback (source, (GSourceFunc) func, user_data, notify);
-
- id = g_source_attach (source, NULL);
- g_source_unref (source);
-
- GST_DEBUG_OBJECT (bus, "New source %p", source);
return id;
}
@@ -941,6 +956,8 @@
*
* Adds a bus watch to the default main context with the default priority.
* This function is used to receive asynchronous messages in the main loop.
+ * There can only be a single bus watch per bus, you must remove it before you
+ * can set a new one.
*
* The watch can be removed using g_source_remove() or by returning FALSE
* from @func.
@@ -1028,7 +1045,8 @@
* @bus: a #GstBus
* @events: a mask of #GstMessageType, representing the set of message types to
* poll for.
- * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll indefinitely.
+ * @timeout: the poll timeout, as a #GstClockTimeDiff, or -1 to poll
+ * indefinitely.
*
* Poll the bus for messages. Will block while waiting for messages to come.
* You can specify a maximum time to poll with the @timeout parameter. If
@@ -1045,6 +1063,23 @@
* This function will run a main loop from the default main context when
* polling.
*
+ * You should never use this function, since it is pure evil. This is
+ * especially true for GUI applications based on Gtk+ or Qt, but also for any
+ * other non-trivial application that uses the GLib main loop. As this function
+ * runs a GLib main loop, any callback attached to the default GLib main
+ * context may be invoked. This could be timeouts, GUI events, I/O events etc.;
+ * even if gst_bus_poll() is called with a 0 timeout. Any of these callbacks
+ * may do things you do not expect, e.g. destroy the main application window or
+ * some other resource; change other application state; display a dialog and
+ * run another main loop until the user clicks it away. In short, using this
+ * function may add a lot of complexity to your code through unexpected
+ * re-entrancy and unexpected changes to your application's state.
+ *
+ * For 0 timeouts use gst_bus_pop_filtered() instead of this function; for
+ * other short timeouts use gst_bus_timed_pop_filtered(); everything else is
+ * better handled by setting up an asynchronous bus watch and doing things
+ * from there.
+ *
* Returns: The message that was received, or NULL if the poll timed out.
* The message is taken from the bus and needs to be unreffed with
* gst_message_unref() after usage.
@@ -1192,9 +1227,7 @@
g_return_if_fail (GST_IS_BUS (bus));
GST_OBJECT_LOCK (bus);
-
bus->priv->num_sync_message_emitters++;
-
GST_OBJECT_UNLOCK (bus);
}
@@ -1223,13 +1256,10 @@
gst_bus_disable_sync_message_emission (GstBus * bus)
{
g_return_if_fail (GST_IS_BUS (bus));
-
g_return_if_fail (bus->num_signal_watchers == 0);
GST_OBJECT_LOCK (bus);
-
bus->priv->num_sync_message_emitters--;
-
GST_OBJECT_UNLOCK (bus);
}
@@ -1246,6 +1276,9 @@
* responsible for calling gst_bus_remove_signal_watch() as many times as this
* function is called.
*
+ * There can only be a single bus watch per bus, you most remove all signal watch
+ * before you can set another type of watch.
+ *
* MT safe.
*/
#ifdef __SYMBIAN32__
@@ -1263,17 +1296,30 @@
if (bus->num_signal_watchers > 0)
goto done;
+ /* this should not fail because the counter above takes care of it */
g_assert (bus->signal_watch_id == 0);
bus->signal_watch_id =
- gst_bus_add_watch_full (bus, priority, gst_bus_async_signal_func, NULL,
- NULL);
+ gst_bus_add_watch_full_unlocked (bus, priority, gst_bus_async_signal_func,
+ NULL, NULL);
+
+ if (G_UNLIKELY (bus->signal_watch_id == 0))
+ goto add_failed;
done:
bus->num_signal_watchers++;
GST_OBJECT_UNLOCK (bus);
+ return;
+
+ /* ERRORS */
+add_failed:
+ {
+ g_critical ("Could not add signal watch to bus %s", GST_OBJECT_NAME (bus));
+ GST_OBJECT_UNLOCK (bus);
+ return;
+ }
}
/**
@@ -1295,7 +1341,6 @@
EXPORT_C
#endif
-
void
gst_bus_add_signal_watch (GstBus * bus)
{
@@ -1317,6 +1362,8 @@
void
gst_bus_remove_signal_watch (GstBus * bus)
{
+ guint id = 0;
+
g_return_if_fail (GST_IS_BUS (bus));
/* I know the callees don't take this lock, so go ahead and abuse it */
@@ -1330,13 +1377,20 @@
if (bus->num_signal_watchers > 0)
goto done;
- g_source_remove (bus->signal_watch_id);
+ id = bus->signal_watch_id;
bus->signal_watch_id = 0;
+ GST_DEBUG_OBJECT (bus, "removing signal watch %u", id);
+
done:
GST_OBJECT_UNLOCK (bus);
+
+ if (id)
+ g_source_remove (id);
+
return;
+ /* ERRORS */
error:
{
g_critical ("Bus %s has no signal watches attached", GST_OBJECT_NAME (bus));