glib/libglib/src/gthreadpool.c
changeset 0 e4d67989cc36
equal deleted inserted replaced
-1:000000000000 0:e4d67989cc36
       
     1 /* GLIB - Library of useful routines for C programming
       
     2  * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
       
     3  *
       
     4  * GAsyncQueue: thread pool implementation.
       
     5  * Copyright (C) 2000 Sebastian Wilhelmi; University of Karlsruhe
       
     6  * Portions copyright (c) 2006 Nokia Corporation.  All rights reserved.
       
     7  *
       
     8  * This library is free software; you can redistribute it and/or
       
     9  * modify it under the terms of the GNU Lesser General Public
       
    10  * License as published by the Free Software Foundation; either
       
    11  * version 2 of the License, or (at your option) any later version.
       
    12  *
       
    13  * This library is distributed in the hope that it will be useful,
       
    14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
       
    15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
       
    16  * Lesser General Public License for more details.
       
    17  *
       
    18  * You should have received a copy of the GNU Lesser General Public
       
    19  * License along with this library; if not, write to the
       
    20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       
    21  * Boston, MA 02111-1307, USA.
       
    22  */
       
    23 
       
    24 /*
       
    25  * MT safe
       
    26  */
       
    27 
       
    28 #include "config.h"
       
    29 
       
    30 #include "glib.h"
       
    31 #include "galias.h"
       
    32 
       
    33 #ifdef __SYMBIAN32__
       
    34 #include <glib_wsd.h>
       
    35 #endif /* __SYMBIAN32__ */
       
    36 
       
    37 #if EMULATOR
       
    38 #define g_thread_functions_for_glib_use (*_g_thread_functions_for_glib_use())
       
    39 #define g_thread_use_default_impl (*_g_thread_use_default_impl())
       
    40 #endif /* EMULATOR */
       
    41 
       
    42 #define DEBUG_MSG(x)  
       
    43 /* #define DEBUG_MSG(args) g_printerr args ; g_printerr ("\n");    */
       
    44 
       
    45 
       
    46 typedef struct _GRealThreadPool GRealThreadPool;
       
    47 
       
    48 struct _GRealThreadPool
       
    49 {
       
    50   GThreadPool pool;
       
    51   GAsyncQueue* queue;
       
    52   GCond* cond;
       
    53   gint max_threads;
       
    54   gint num_threads;
       
    55   gboolean running;
       
    56   gboolean immediate;
       
    57   gboolean waiting;
       
    58   GCompareDataFunc sort_func;
       
    59   gpointer sort_user_data;
       
    60 };
       
    61 
       
    62 /* The following is just an address to mark the wakeup order for a
       
    63  * thread, it could be any address (as long, as it isn't a valid
       
    64  * GThreadPool address) */
       
    65 #if EMULATOR
       
    66 
       
    67 PLS(wakeup_thread_serial,gthreadpool,gint)
       
    68 PLS(unused_thread_queue,gthreadpool,GAsyncQueue *)
       
    69 PLS(unused_threads,gthreadpool,gint)
       
    70 PLS(max_unused_threads,gthreadpool,gint)
       
    71 PLS(kill_unused_threads,gthreadpool,gint)
       
    72 PLS(max_idle_time,gthreadpool,guint)
       
    73 
       
    74 #define wakeup_thread_serial (*FUNCTION_NAME(wakeup_thread_serial,gthreadpool)())
       
    75 #define unused_thread_queue (*FUNCTION_NAME(unused_thread_queue,gthreadpool)())
       
    76 #define unused_threads (*FUNCTION_NAME(unused_threads,gthreadpool)())
       
    77 #define max_unused_threads (*FUNCTION_NAME(max_unused_threads,gthreadpool)())
       
    78 #define kill_unused_threads (*FUNCTION_NAME(kill_unused_threads,gthreadpool)())
       
    79 #define max_idle_time (*FUNCTION_NAME(max_idle_time,gthreadpool)())
       
    80 
       
    81 #else 
       
    82 
       
    83 static gint wakeup_thread_serial = 0;
       
    84 
       
    85 /* Here all unused threads are waiting  */
       
    86 static GAsyncQueue *unused_thread_queue = NULL;
       
    87 static gint unused_threads = 0;
       
    88 static gint max_unused_threads = 0;
       
    89 static gint kill_unused_threads = 0;
       
    90 static guint max_idle_time = 0;
       
    91 
       
    92 #endif /* EMULATOR */
       
    93 
       
    94 static gconstpointer const wakeup_thread_marker = (gconstpointer) &g_thread_pool_new;
       
    95 
       
    96 static void             g_thread_pool_queue_push_unlocked (GRealThreadPool  *pool,
       
    97 							   gpointer          data);
       
    98 static void             g_thread_pool_free_internal       (GRealThreadPool  *pool);
       
    99 static gpointer         g_thread_pool_thread_proxy        (gpointer          data);
       
   100 static void             g_thread_pool_start_thread        (GRealThreadPool  *pool,
       
   101 							   GError          **error);
       
   102 static void             g_thread_pool_wakeup_and_stop_all (GRealThreadPool  *pool);
       
   103 static GRealThreadPool* g_thread_pool_wait_for_new_pool   (void);
       
   104 static gpointer         g_thread_pool_wait_for_new_task   (GRealThreadPool  *pool);
       
   105 
       
   106 static void
       
   107 g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
       
   108 				   gpointer         data)
       
   109 {
       
   110   if (pool->sort_func) 
       
   111     g_async_queue_push_sorted_unlocked (pool->queue, 
       
   112 					data,
       
   113 					pool->sort_func, 
       
   114 					pool->sort_user_data);
       
   115   else
       
   116     g_async_queue_push_unlocked (pool->queue, data);
       
   117 }
       
   118 
       
   119 static GRealThreadPool*
       
   120 g_thread_pool_wait_for_new_pool (void)
       
   121 {
       
   122   GRealThreadPool *pool;
       
   123   gint local_wakeup_thread_serial;
       
   124   guint local_max_unused_threads;
       
   125   gint local_max_idle_time;
       
   126   gint last_wakeup_thread_serial;
       
   127   gboolean have_relayed_thread_marker = FALSE;
       
   128 
       
   129   local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
       
   130   local_max_idle_time = g_atomic_int_get (&max_idle_time);
       
   131   last_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
       
   132 
       
   133   g_atomic_int_inc (&unused_threads);
       
   134 
       
   135   do
       
   136     {
       
   137       if (g_atomic_int_get (&unused_threads) >= local_max_unused_threads)
       
   138 	{
       
   139 	  /* If this is a superfluous thread, stop it. */
       
   140 	  pool = NULL;
       
   141 	}
       
   142       else if (local_max_idle_time > 0)
       
   143 	{
       
   144 	  /* If a maximal idle time is given, wait for the given time. */
       
   145 	  GTimeVal end_time;
       
   146 
       
   147 	  g_get_current_time (&end_time);
       
   148 	  g_time_val_add (&end_time, local_max_idle_time * 1000);
       
   149 
       
   150 	  DEBUG_MSG (("thread %p waiting in global pool for %f seconds.",
       
   151 		      g_thread_self (), local_max_idle_time / 1000.0));
       
   152 
       
   153 	  pool = g_async_queue_timed_pop (unused_thread_queue, &end_time);
       
   154 	}
       
   155       else
       
   156 	{
       
   157 	  /* If no maximal idle time is given, wait indefinitely. */
       
   158 	  DEBUG_MSG (("thread %p waiting in global pool.",
       
   159 		      g_thread_self ()));
       
   160 	  pool = g_async_queue_pop (unused_thread_queue);
       
   161 	}
       
   162 
       
   163       if (pool == wakeup_thread_marker)
       
   164 	{
       
   165 	  local_wakeup_thread_serial = g_atomic_int_get (&wakeup_thread_serial);
       
   166 	  if (last_wakeup_thread_serial == local_wakeup_thread_serial)
       
   167 	    {
       
   168 	      if (!have_relayed_thread_marker)
       
   169 	      {
       
   170 		/* If this wakeup marker has been received for
       
   171 		 * the second time, relay it. 
       
   172 		 */
       
   173 		DEBUG_MSG (("thread %p relaying wakeup message to "
       
   174 			    "waiting thread with lower serial.",
       
   175 			    g_thread_self ()));
       
   176 
       
   177 		g_async_queue_push (unused_thread_queue, FIX_CASTING(void *)wakeup_thread_marker);
       
   178 		have_relayed_thread_marker = TRUE;
       
   179 
       
   180 		/* If a wakeup marker has been relayed, this thread
       
   181 		 * will get out of the way for 100 microseconds to
       
   182 		 * avoid receiving this marker again. */
       
   183 		g_usleep (100);
       
   184 	      }
       
   185 	    }
       
   186 	  else
       
   187 	    {
       
   188 	      if (g_atomic_int_exchange_and_add (&kill_unused_threads, -1) > 0)
       
   189 	        {
       
   190 		  pool = NULL;
       
   191 		  break;
       
   192 		}
       
   193 
       
   194 	      DEBUG_MSG (("thread %p updating to new limits.",
       
   195 			  g_thread_self ()));
       
   196 
       
   197 	      local_max_unused_threads = g_atomic_int_get (&max_unused_threads);
       
   198 	      local_max_idle_time = g_atomic_int_get (&max_idle_time);
       
   199 	      last_wakeup_thread_serial = local_wakeup_thread_serial;
       
   200 
       
   201 	      have_relayed_thread_marker = FALSE;
       
   202 	    }
       
   203 	}
       
   204     }
       
   205   while (pool == wakeup_thread_marker);
       
   206 
       
   207   g_atomic_int_add (&unused_threads, -1);
       
   208 
       
   209   return pool;
       
   210 }
       
   211 
       
   212 static gpointer
       
   213 g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
       
   214 {
       
   215   gpointer task = NULL;
       
   216 
       
   217   if (pool->running || (!pool->immediate &&
       
   218 			g_async_queue_length_unlocked (pool->queue) > 0))
       
   219     {
       
   220       /* This thread pool is still active. */
       
   221       if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
       
   222 	{
       
   223 	  /* This is a superfluous thread, so it goes to the global pool. */
       
   224 	  DEBUG_MSG (("superfluous thread %p in pool %p.",
       
   225 		      g_thread_self (), pool));
       
   226 	}
       
   227       else if (pool->pool.exclusive)
       
   228 	{
       
   229 	  /* Exclusive threads stay attached to the pool. */
       
   230 	  task = g_async_queue_pop_unlocked (pool->queue);
       
   231 
       
   232 	  DEBUG_MSG (("thread %p in exclusive pool %p waits for task "
       
   233 		      "(%d running, %d unprocessed).",
       
   234 		      g_thread_self (), pool, pool->num_threads,
       
   235 		      g_async_queue_length_unlocked (pool->queue)));
       
   236 	}
       
   237       else
       
   238 	{
       
   239 	  /* A thread will wait for new tasks for at most 1/2
       
   240 	   * second before going to the global pool.
       
   241 	   */
       
   242 	  GTimeVal end_time;
       
   243 
       
   244 	  g_get_current_time (&end_time);
       
   245 	  g_time_val_add (&end_time, G_USEC_PER_SEC / 2);	/* 1/2 second */
       
   246 
       
   247 	  DEBUG_MSG (("thread %p in pool %p waits for up to a 1/2 second for task "
       
   248 		      "(%d running, %d unprocessed).",
       
   249 		      g_thread_self (), pool, pool->num_threads,
       
   250 		      g_async_queue_length_unlocked (pool->queue)));
       
   251 
       
   252 	  task = g_async_queue_timed_pop_unlocked (pool->queue, &end_time);
       
   253 	}
       
   254     }
       
   255   else
       
   256     {
       
   257       /* This thread pool is inactive, it will no longer process tasks. */
       
   258       DEBUG_MSG (("pool %p not active, thread %p will go to global pool "
       
   259 		  "(running: %s, immediate: %s, len: %d).",
       
   260 		  pool, g_thread_self (),
       
   261 		  pool->running ? "true" : "false",
       
   262 		  pool->immediate ? "true" : "false",
       
   263 		  g_async_queue_length_unlocked (pool->queue)));
       
   264     }
       
   265 
       
   266   return task;
       
   267 }
       
   268 
       
   269 
       
   270 static gpointer 
       
   271 g_thread_pool_thread_proxy (gpointer data)
       
   272 {
       
   273   GRealThreadPool *pool;
       
   274 
       
   275   pool = data;
       
   276 
       
   277   DEBUG_MSG (("thread %p started for pool %p.", 
       
   278 	      g_thread_self (), pool));
       
   279 
       
   280   g_async_queue_lock (pool->queue);
       
   281 
       
   282   while (TRUE)
       
   283     {
       
   284       gpointer task;
       
   285 
       
   286       task = g_thread_pool_wait_for_new_task (pool);
       
   287       if (task)
       
   288 	{
       
   289 	  if (pool->running || !pool->immediate)
       
   290 	    {
       
   291 	      /* A task was received and the thread pool is active, so
       
   292 	       * execute the function. 
       
   293 	       */
       
   294 	      g_async_queue_unlock (pool->queue);
       
   295 	      DEBUG_MSG (("thread %p in pool %p calling func.", 
       
   296 			  g_thread_self (), pool));
       
   297 	      pool->pool.func (task, pool->pool.user_data);
       
   298 	      g_async_queue_lock (pool->queue);
       
   299 	    }
       
   300 	}
       
   301       else
       
   302 	{
       
   303 	  /* No task was received, so this thread goes to the global
       
   304 	   * pool. 
       
   305 	   */
       
   306 	  gboolean free_pool = FALSE;
       
   307  
       
   308 	  DEBUG_MSG (("thread %p leaving pool %p for global pool.", 
       
   309 		      g_thread_self (), pool));
       
   310 	  pool->num_threads--;
       
   311 
       
   312 	  if (!pool->running)
       
   313 	    {
       
   314 	      if (!pool->waiting)
       
   315 		{
       
   316 		  if (pool->num_threads == 0)
       
   317 		    {
       
   318 		      /* If the pool is not running and no other
       
   319 		       * thread is waiting for this thread pool to
       
   320 		       * finish and this is the last thread of this
       
   321 		       * pool, free the pool.
       
   322 		       */
       
   323 		      free_pool = TRUE;
       
   324 		    }		
       
   325 		  else 
       
   326 		    {
       
   327 		      /* If the pool is not running and no other
       
   328 		       * thread is waiting for this thread pool to
       
   329 		       * finish and this is not the last thread of
       
   330 		       * this pool and there are no tasks left in the
       
   331 		       * queue, wakeup the remaining threads. 
       
   332 		       */
       
   333 		      if (g_async_queue_length_unlocked (pool->queue) == 
       
   334 			  - pool->num_threads)
       
   335 			g_thread_pool_wakeup_and_stop_all (pool);
       
   336 		    }
       
   337 		}
       
   338 	      else if (pool->immediate || 
       
   339 		       g_async_queue_length_unlocked (pool->queue) <= 0)
       
   340 		{
       
   341 		  /* If the pool is not running and another thread is
       
   342 		   * waiting for this thread pool to finish and there
       
   343 		   * are either no tasks left or the pool shall stop
       
   344 		   * immediatly, inform the waiting thread of a change
       
   345 		   * of the thread pool state. 
       
   346 		   */
       
   347 		  g_cond_broadcast (pool->cond);
       
   348 		}
       
   349 	    }
       
   350 
       
   351 	  g_async_queue_unlock (pool->queue);
       
   352 
       
   353 	  if (free_pool)
       
   354 	    g_thread_pool_free_internal (pool);
       
   355 
       
   356 	  if ((pool = g_thread_pool_wait_for_new_pool ()) == NULL) 
       
   357 	    break;
       
   358 
       
   359 	  g_async_queue_lock (pool->queue);
       
   360 	  
       
   361 	  DEBUG_MSG (("thread %p entering pool %p from global pool.", 
       
   362 		      g_thread_self (), pool));
       
   363 
       
   364 	  /* pool->num_threads++ is not done here, but in
       
   365            * g_thread_pool_start_thread to make the new started thread
       
   366            * known to the pool, before itself can do it. 
       
   367 	   */
       
   368 	}
       
   369     }
       
   370 
       
   371   return NULL;
       
   372 }
       
   373 
       
   374 static void
       
   375 g_thread_pool_start_thread (GRealThreadPool  *pool, 
       
   376 			    GError          **error)
       
   377 {
       
   378   gboolean success = FALSE;
       
   379   
       
   380   if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
       
   381     /* Enough threads are already running */
       
   382     return;
       
   383 
       
   384   g_async_queue_lock (unused_thread_queue);
       
   385 
       
   386   if (g_async_queue_length_unlocked (unused_thread_queue) < 0)
       
   387     {
       
   388       g_async_queue_push_unlocked (unused_thread_queue, pool);
       
   389       success = TRUE;
       
   390     }
       
   391 
       
   392   g_async_queue_unlock (unused_thread_queue);
       
   393 
       
   394   if (!success)
       
   395     {
       
   396       GError *local_error = NULL;
       
   397       /* No thread was found, we have to start a new one */
       
   398       g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
       
   399       
       
   400       if (local_error)
       
   401 	{
       
   402 	  g_propagate_error (error, local_error);
       
   403 	  return;
       
   404 	}
       
   405     }
       
   406 
       
   407   /* See comment in g_thread_pool_thread_proxy as to why this is done
       
   408    * here and not there
       
   409    */
       
   410   pool->num_threads++;
       
   411 }
       
   412 
       
   413 /**
       
   414  * g_thread_pool_new: 
       
   415  * @func: a function to execute in the threads of the new thread pool
       
   416  * @user_data: user data that is handed over to @func every time it 
       
   417  *   is called
       
   418  * @max_threads: the maximal number of threads to execute concurrently in 
       
   419  *   the new thread pool, -1 means no limit
       
   420  * @exclusive: should this thread pool be exclusive?
       
   421  * @error: return location for error
       
   422  *
       
   423  * This function creates a new thread pool.
       
   424  *
       
   425  * Whenever you call g_thread_pool_push(), either a new thread is
       
   426  * created or an unused one is reused. At most @max_threads threads
       
   427  * are running concurrently for this thread pool. @max_threads = -1
       
   428  * allows unlimited threads to be created for this thread pool. The
       
   429  * newly created or reused thread now executes the function @func with
       
   430  * the two arguments. The first one is the parameter to
       
   431  * g_thread_pool_push() and the second one is @user_data.
       
   432  *
       
   433  * The parameter @exclusive determines, whether the thread pool owns
       
   434  * all threads exclusive or whether the threads are shared
       
   435  * globally. If @exclusive is %TRUE, @max_threads threads are started
       
   436  * immediately and they will run exclusively for this thread pool until
       
   437  * it is destroyed by g_thread_pool_free(). If @exclusive is %FALSE,
       
   438  * threads are created, when needed and shared between all
       
   439  * non-exclusive thread pools. This implies that @max_threads may not
       
   440  * be -1 for exclusive thread pools.
       
   441  *
       
   442  * @error can be %NULL to ignore errors, or non-%NULL to report
       
   443  * errors. An error can only occur when @exclusive is set to %TRUE and
       
   444  * not all @max_threads threads could be created.
       
   445  *
       
   446  * Return value: the new #GThreadPool
       
   447  **/
       
   448  
       
   449 #if EMULATOR
       
   450 
       
   451 PLS_MACRO(init,g_thread_pool_new,GStaticMutex)
       
   452 #define g__init_lock (*FUNCTION_NAME_MACRO(init,g_thread_pool_new)())
       
   453 
       
   454 #endif /* EMULATOR */
       
   455 
       
   456 EXPORT_C GThreadPool* 
       
   457 g_thread_pool_new (GFunc            func,
       
   458 		   gpointer         user_data,
       
   459 		   gint             max_threads,
       
   460 		   gboolean         exclusive,
       
   461 		   GError         **error)
       
   462 {
       
   463   GRealThreadPool *retval;
       
   464   
       
   465   #if !(EMULATOR)
       
   466   G_LOCK_DEFINE_STATIC (init);
       
   467   #endif /* !(EMULATOR) */
       
   468 
       
   469   g_return_val_if_fail (func, NULL);
       
   470   g_return_val_if_fail (!exclusive || max_threads != -1, NULL);
       
   471   g_return_val_if_fail (max_threads >= -1, NULL);
       
   472   g_return_val_if_fail (g_thread_supported (), NULL);
       
   473   retval = g_new (GRealThreadPool, 1);
       
   474 
       
   475   retval->pool.func = func;
       
   476   retval->pool.user_data = user_data;
       
   477   retval->pool.exclusive = exclusive;
       
   478   retval->queue = g_async_queue_new ();
       
   479   retval->cond = NULL;
       
   480   retval->max_threads = max_threads;
       
   481   retval->num_threads = 0;
       
   482   retval->running = TRUE;
       
   483   retval->sort_func = NULL;
       
   484   retval->sort_user_data = NULL;
       
   485 
       
   486   G_LOCK (init);
       
   487   if (!unused_thread_queue)
       
   488       unused_thread_queue = g_async_queue_new ();
       
   489   G_UNLOCK (init);
       
   490 
       
   491   if (retval->pool.exclusive)
       
   492     {
       
   493       g_async_queue_lock (retval->queue);
       
   494   
       
   495       while (retval->num_threads < retval->max_threads)
       
   496 	{
       
   497 	  GError *local_error = NULL;
       
   498 	  g_thread_pool_start_thread (retval, &local_error);
       
   499 	  if (local_error)
       
   500 	    {
       
   501 	      g_propagate_error (error, local_error);
       
   502 	      break;
       
   503 	    }
       
   504 	}
       
   505 
       
   506       g_async_queue_unlock (retval->queue);
       
   507     }
       
   508 
       
   509   return (GThreadPool*) retval;
       
   510 }
       
   511 
       
   512 #if EMULATOR
       
   513 #undef init
       
   514 #endif /* EMULATOR */
       
   515 
       
   516 /**
       
   517  * g_thread_pool_push:
       
   518  * @pool: a #GThreadPool
       
   519  * @data: a new task for @pool
       
   520  * @error: return location for error
       
   521  * 
       
   522  * Inserts @data into the list of tasks to be executed by @pool. When
       
   523  * the number of currently running threads is lower than the maximal
       
   524  * allowed number of threads, a new thread is started (or reused) with
       
   525  * the properties given to g_thread_pool_new (). Otherwise @data stays
       
   526  * in the queue until a thread in this pool finishes its previous task
       
   527  * and processes @data. 
       
   528  *
       
   529  * @error can be %NULL to ignore errors, or non-%NULL to report
       
   530  * errors. An error can only occur when a new thread couldn't be
       
   531  * created. In that case @data is simply appended to the queue of work
       
   532  * to do.  
       
   533  **/
       
   534 EXPORT_C void 
       
   535 g_thread_pool_push (GThreadPool  *pool,
       
   536 		    gpointer      data,
       
   537 		    GError      **error)
       
   538 {
       
   539   GRealThreadPool *real;
       
   540 
       
   541   real = (GRealThreadPool*) pool;
       
   542 
       
   543   g_return_if_fail (real);
       
   544   g_return_if_fail (real->running);
       
   545 
       
   546   g_async_queue_lock (real->queue);
       
   547 
       
   548   if (g_async_queue_length_unlocked (real->queue) >= 0)
       
   549     /* No thread is waiting in the queue */
       
   550     g_thread_pool_start_thread (real, error);
       
   551 
       
   552   g_thread_pool_queue_push_unlocked (real, data);
       
   553   g_async_queue_unlock (real->queue);
       
   554 }
       
   555 
       
   556 /**
       
   557  * g_thread_pool_set_max_threads:
       
   558  * @pool: a #GThreadPool
       
   559  * @max_threads: a new maximal number of threads for @pool
       
   560  * @error: return location for error
       
   561  * 
       
   562  * Sets the maximal allowed number of threads for @pool. A value of -1
       
   563  * means, that the maximal number of threads is unlimited.
       
   564  *
       
   565  * Setting @max_threads to 0 means stopping all work for @pool. It is
       
   566  * effectively frozen until @max_threads is set to a non-zero value
       
   567  * again.
       
   568  * 
       
   569  * A thread is never terminated while calling @func, as supplied by
       
   570  * g_thread_pool_new (). Instead the maximal number of threads only
       
   571  * has effect for the allocation of new threads in g_thread_pool_push(). 
       
   572  * A new thread is allocated, whenever the number of currently
       
   573  * running threads in @pool is smaller than the maximal number.
       
   574  *
       
   575  * @error can be %NULL to ignore errors, or non-%NULL to report
       
   576  * errors. An error can only occur when a new thread couldn't be
       
   577  * created. 
       
   578  **/
       
   579 EXPORT_C void
       
   580 g_thread_pool_set_max_threads (GThreadPool  *pool,
       
   581 			       gint          max_threads,
       
   582 			       GError      **error)
       
   583 {
       
   584   GRealThreadPool *real;
       
   585   gint to_start;
       
   586 
       
   587   real = (GRealThreadPool*) pool;
       
   588 
       
   589   g_return_if_fail (real);
       
   590   g_return_if_fail (real->running);
       
   591   g_return_if_fail (!real->pool.exclusive || max_threads != -1);
       
   592   g_return_if_fail (max_threads >= -1);
       
   593 
       
   594   g_async_queue_lock (real->queue);
       
   595 
       
   596   real->max_threads = max_threads;
       
   597   
       
   598   if (pool->exclusive)
       
   599     to_start = real->max_threads - real->num_threads;
       
   600   else
       
   601     to_start = g_async_queue_length_unlocked (real->queue);
       
   602   
       
   603   for ( ; to_start > 0; to_start--)
       
   604     {
       
   605       GError *local_error = NULL;
       
   606 
       
   607       g_thread_pool_start_thread (real, &local_error);
       
   608       if (local_error)
       
   609 	{
       
   610 	  g_propagate_error (error, local_error);
       
   611 	  break;
       
   612 	}
       
   613     }
       
   614    
       
   615   g_async_queue_unlock (real->queue);
       
   616 }
       
   617 
       
   618 /**
       
   619  * g_thread_pool_get_max_threads:
       
   620  * @pool: a #GThreadPool
       
   621  *
       
   622  * Returns the maximal number of threads for @pool.
       
   623  *
       
   624  * Return value: the maximal number of threads
       
   625  **/
       
   626 EXPORT_C gint
       
   627 g_thread_pool_get_max_threads (GThreadPool *pool)
       
   628 {
       
   629   GRealThreadPool *real;
       
   630   gint retval;
       
   631 
       
   632   real = (GRealThreadPool*) pool;
       
   633 
       
   634   g_return_val_if_fail (real, 0);
       
   635   g_return_val_if_fail (real->running, 0);
       
   636 
       
   637   g_async_queue_lock (real->queue);
       
   638   retval = real->max_threads;
       
   639   g_async_queue_unlock (real->queue);
       
   640 
       
   641   return retval;
       
   642 }
       
   643 
       
   644 /**
       
   645  * g_thread_pool_get_num_threads:
       
   646  * @pool: a #GThreadPool
       
   647  *
       
   648  * Returns the number of threads currently running in @pool.
       
   649  *
       
   650  * Return value: the number of threads currently running
       
   651  **/
       
   652 EXPORT_C guint
       
   653 g_thread_pool_get_num_threads (GThreadPool *pool)
       
   654 {
       
   655   GRealThreadPool *real;
       
   656   guint retval;
       
   657 
       
   658   real = (GRealThreadPool*) pool;
       
   659 
       
   660   g_return_val_if_fail (real, 0);
       
   661   g_return_val_if_fail (real->running, 0);
       
   662 
       
   663   g_async_queue_lock (real->queue);
       
   664   retval = real->num_threads;
       
   665   g_async_queue_unlock (real->queue);
       
   666 
       
   667   return retval;
       
   668 }
       
   669 
       
   670 /**
       
   671  * g_thread_pool_unprocessed:
       
   672  * @pool: a #GThreadPool
       
   673  *
       
   674  * Returns the number of tasks still unprocessed in @pool.
       
   675  *
       
   676  * Return value: the number of unprocessed tasks
       
   677  **/
       
   678 EXPORT_C guint
       
   679 g_thread_pool_unprocessed (GThreadPool *pool)
       
   680 {
       
   681   GRealThreadPool *real;
       
   682   gint unprocessed;
       
   683 
       
   684   real = (GRealThreadPool*) pool;
       
   685 
       
   686   g_return_val_if_fail (real, 0);
       
   687   g_return_val_if_fail (real->running, 0);
       
   688 
       
   689   unprocessed = g_async_queue_length (real->queue);
       
   690 
       
   691   return MAX (unprocessed, 0);
       
   692 }
       
   693 
       
   694 /**
       
   695  * g_thread_pool_free:
       
   696  * @pool: a #GThreadPool
       
   697  * @immediate: should @pool shut down immediately?
       
   698  * @wait: should the function wait for all tasks to be finished?
       
   699  *
       
   700  * Frees all resources allocated for @pool.
       
   701  *
       
   702  * If @immediate is %TRUE, no new task is processed for
       
   703  * @pool. Otherwise @pool is not freed before the last task is
       
   704  * processed. Note however, that no thread of this pool is
       
   705  * interrupted, while processing a task. Instead at least all still
       
   706  * running threads can finish their tasks before the @pool is freed.
       
   707  *
       
   708  * If @wait is %TRUE, the functions does not return before all tasks
       
   709  * to be processed (dependent on @immediate, whether all or only the
       
   710  * currently running) are ready. Otherwise the function returns immediately.
       
   711  *
       
   712  * After calling this function @pool must not be used anymore. 
       
   713  **/
       
   714 EXPORT_C void
       
   715 g_thread_pool_free (GThreadPool *pool,
       
   716 		    gboolean     immediate,
       
   717 		    gboolean     wait)
       
   718 {
       
   719   GRealThreadPool *real;
       
   720 
       
   721   real = (GRealThreadPool*) pool;
       
   722 
       
   723   g_return_if_fail (real);
       
   724   g_return_if_fail (real->running);
       
   725 
       
   726   /* If there's no thread allowed here, there is not much sense in
       
   727    * not stopping this pool immediately, when it's not empty 
       
   728    */
       
   729   g_return_if_fail (immediate || 
       
   730 		    real->max_threads != 0 || 
       
   731 		    g_async_queue_length (real->queue) == 0);
       
   732 
       
   733   g_async_queue_lock (real->queue);
       
   734 
       
   735   real->running = FALSE;
       
   736   real->immediate = immediate;
       
   737   real->waiting = wait;
       
   738 
       
   739   if (wait)
       
   740     {
       
   741       real->cond = g_cond_new ();
       
   742 
       
   743       while (g_async_queue_length_unlocked (real->queue) != -real->num_threads &&
       
   744 	     !(immediate && real->num_threads == 0))
       
   745 	g_cond_wait (real->cond, _g_async_queue_get_mutex (real->queue));
       
   746     }
       
   747 
       
   748   if (immediate || g_async_queue_length_unlocked (real->queue) == -real->num_threads)
       
   749     {
       
   750       /* No thread is currently doing something (and nothing is left
       
   751        * to process in the queue) 
       
   752        */
       
   753       if (real->num_threads == 0) 
       
   754 	{
       
   755 	  /* No threads left, we clean up */
       
   756 	  g_async_queue_unlock (real->queue);
       
   757 	  g_thread_pool_free_internal (real);
       
   758 	  return;
       
   759 	}
       
   760 
       
   761       g_thread_pool_wakeup_and_stop_all (real);
       
   762     }
       
   763   
       
   764   /* The last thread should cleanup the pool */
       
   765   real->waiting = FALSE; 
       
   766   g_async_queue_unlock (real->queue);
       
   767 }
       
   768 
       
   769 static void
       
   770 g_thread_pool_free_internal (GRealThreadPool* pool)
       
   771 {
       
   772   g_return_if_fail (pool);
       
   773   g_return_if_fail (pool->running == FALSE);
       
   774   g_return_if_fail (pool->num_threads == 0);
       
   775 
       
   776   g_async_queue_unref (pool->queue);
       
   777 
       
   778   if (pool->cond)
       
   779     g_cond_free (pool->cond);
       
   780 
       
   781   g_free (pool);
       
   782 }
       
   783 
       
   784 static void
       
   785 g_thread_pool_wakeup_and_stop_all (GRealThreadPool* pool)
       
   786 {
       
   787   guint i;
       
   788   
       
   789   g_return_if_fail (pool);
       
   790   g_return_if_fail (pool->running == FALSE);
       
   791   g_return_if_fail (pool->num_threads != 0);
       
   792 
       
   793   pool->immediate = TRUE; 
       
   794 
       
   795   for (i = 0; i < pool->num_threads; i++)
       
   796     g_thread_pool_queue_push_unlocked (pool, GUINT_TO_POINTER (1));
       
   797 }
       
   798 
       
   799 /**
       
   800  * g_thread_pool_set_max_unused_threads:
       
   801  * @max_threads: maximal number of unused threads
       
   802  *
       
   803  * Sets the maximal number of unused threads to @max_threads. If
       
   804  * @max_threads is -1, no limit is imposed on the number of unused
       
   805  * threads.
       
   806  **/
       
   807 EXPORT_C void
       
   808 g_thread_pool_set_max_unused_threads (gint max_threads)
       
   809 {
       
   810   g_return_if_fail (max_threads >= -1);  
       
   811 
       
   812   g_atomic_int_set (&max_unused_threads, max_threads);
       
   813 
       
   814   if (max_threads != -1)
       
   815     {
       
   816       max_threads -= g_atomic_int_get (&unused_threads);
       
   817       if (max_threads < 0)
       
   818 	{
       
   819 	  g_atomic_int_set (&kill_unused_threads, -max_threads);
       
   820 	  g_atomic_int_inc (&wakeup_thread_serial);
       
   821 
       
   822 	  g_async_queue_lock (unused_thread_queue);
       
   823 
       
   824 	  do
       
   825 	    {
       
   826 	      g_async_queue_push_unlocked (unused_thread_queue,
       
   827 					   FIX_CASTING(void *)wakeup_thread_marker);
       
   828 	    }
       
   829 	  while (++max_threads);
       
   830 
       
   831 	  g_async_queue_unlock (unused_thread_queue);
       
   832 	}
       
   833     }
       
   834 }
       
   835 
       
   836 /**
       
   837  * g_thread_pool_get_max_unused_threads:
       
   838  * 
       
   839  * Returns the maximal allowed number of unused threads.
       
   840  *
       
   841  * Return value: the maximal number of unused threads
       
   842  **/
       
   843 EXPORT_C gint
       
   844 g_thread_pool_get_max_unused_threads (void)
       
   845 {
       
   846   return g_atomic_int_get (&max_unused_threads);
       
   847 }
       
   848 
       
   849 /**
       
   850  * g_thread_pool_get_num_unused_threads:
       
   851  * 
       
   852  * Returns the number of currently unused threads.
       
   853  *
       
   854  * Return value: the number of currently unused threads
       
   855  **/
       
   856 EXPORT_C guint 
       
   857 g_thread_pool_get_num_unused_threads (void)
       
   858 {
       
   859   return g_atomic_int_get (&unused_threads);
       
   860 }
       
   861 
       
   862 /**
       
   863  * g_thread_pool_stop_unused_threads:
       
   864  * 
       
   865  * Stops all currently unused threads. This does not change the
       
   866  * maximal number of unused threads. This function can be used to
       
   867  * regularly stop all unused threads e.g. from g_timeout_add().
       
   868  **/
       
   869 EXPORT_C void
       
   870 g_thread_pool_stop_unused_threads (void)
       
   871 { 
       
   872   guint oldval;
       
   873 
       
   874   oldval = g_thread_pool_get_max_unused_threads ();
       
   875 
       
   876   g_thread_pool_set_max_unused_threads (0);
       
   877   g_thread_pool_set_max_unused_threads (oldval);
       
   878 }
       
   879 
       
   880 /**
       
   881  * g_thread_pool_set_sort_function:
       
   882  * @pool: a #GThreadPool
       
   883  * @func: the #GCompareDataFunc used to sort the list of tasks. 
       
   884  *     This function is passed two tasks. It should return
       
   885  *     0 if the order in which they are handled does not matter, 
       
   886  *     a negative value if the first task should be processed before
       
   887  *     the second or a positive value if the second task should be 
       
   888  *     processed first.
       
   889  * @user_data: user data passed to @func.
       
   890  *
       
   891  * Sets the function used to sort the list of tasks. This allows the
       
   892  * tasks to be processed by a priority determined by @func, and not
       
   893  * just in the order in which they were added to the pool.
       
   894  *
       
   895  * Note, if the maximum number of threads is more than 1, the order
       
   896  * that threads are executed can not be guranteed 100%. Threads are
       
   897  * scheduled by the operating system and are executed at random. It
       
   898  * cannot be assumed that threads are executed in the order they are
       
   899  * created. 
       
   900  *
       
   901  * Since: 2.10
       
   902  **/
       
   903 EXPORT_C void 
       
   904 g_thread_pool_set_sort_function (GThreadPool      *pool,
       
   905 				 GCompareDataFunc  func,
       
   906 				 gpointer          user_data)
       
   907 { 
       
   908   GRealThreadPool *real;
       
   909 
       
   910   real = (GRealThreadPool*) pool;
       
   911 
       
   912   g_return_if_fail (real);
       
   913   g_return_if_fail (real->running);
       
   914 
       
   915   g_async_queue_lock (real->queue);
       
   916 
       
   917   real->sort_func = func;
       
   918   real->sort_user_data = user_data;
       
   919   
       
   920   if (func) 
       
   921     g_async_queue_sort_unlocked (real->queue, 
       
   922 				 real->sort_func,
       
   923 				 real->sort_user_data);
       
   924 
       
   925   g_async_queue_unlock (real->queue);
       
   926 }
       
   927 
       
   928 /**
       
   929  * g_thread_pool_set_max_idle_time:
       
   930  * @interval: the maximum @interval (1/1000ths of a second) a thread
       
   931  *     can be idle. 
       
   932  *
       
   933  * This function will set the maximum @interval that a thread waiting
       
   934  * in the pool for new tasks can be idle for before being
       
   935  * stopped. This function is similar to calling
       
   936  * g_thread_pool_stop_unused_threads() on a regular timeout, except,
       
   937  * this is done on a per thread basis.    
       
   938  *
       
   939  * By setting @interval to 0, idle threads will not be stopped.
       
   940  *  
       
   941  * This function makes use of g_async_queue_timed_pop () using
       
   942  * @interval.
       
   943  *
       
   944  * Since: 2.10
       
   945  **/
       
   946 EXPORT_C void
       
   947 g_thread_pool_set_max_idle_time (guint interval)
       
   948 { 
       
   949   guint i;
       
   950 
       
   951   g_atomic_int_set (&max_idle_time, interval);
       
   952 
       
   953   i = g_atomic_int_get (&unused_threads);
       
   954   if (i > 0)
       
   955     {
       
   956       g_atomic_int_inc (&wakeup_thread_serial);
       
   957       g_async_queue_lock (unused_thread_queue);
       
   958 
       
   959       do
       
   960 	{
       
   961 	  g_async_queue_push_unlocked (unused_thread_queue,
       
   962 				       FIX_CASTING(void *)wakeup_thread_marker);
       
   963 	}
       
   964       while (--i);
       
   965 
       
   966       g_async_queue_unlock (unused_thread_queue);
       
   967     }
       
   968 }
       
   969 
       
   970 /**
       
   971  * g_thread_pool_get_max_idle_time:
       
   972  * 
       
   973  * This function will return the maximum @interval that a thread will
       
   974  * wait in the thread pool for new tasks before being stopped.
       
   975  *
       
   976  * If this function returns 0, threads waiting in the thread pool for
       
   977  * new work are not stopped.
       
   978  *
       
   979  * Return value: the maximum @interval to wait for new tasks in the
       
   980  *     thread pool before stopping the thread (1/1000ths of a second).
       
   981  *  
       
   982  * Since: 2.10
       
   983  **/
       
   984 EXPORT_C guint
       
   985 g_thread_pool_get_max_idle_time (void)
       
   986 { 
       
   987   return g_atomic_int_get (&max_idle_time);
       
   988 }
       
   989 
       
   990 #define __G_THREADPOOL_C__
       
   991 #include "galiasdef.c"