releasing/makecbr/Parallel/ForkManager.pm
changeset 602 3145852acc89
equal deleted inserted replaced
600:6d08f4a05d93 602:3145852acc89
       
     1 # Copyright (c) 2000 Szab? Balázs (dLux)
       
     2 #
       
     3 # All right reserved. This program is free software; you can redistribute it 
       
     4 # and/or modify it under the same terms as Perl itself.
       
     5 #
       
     6 
       
     7 =head1 NAME
       
     8 
       
     9 Parallel::ForkManager - A simple parallel processing fork manager
       
    10 
       
    11 =head1 SYNOPSIS
       
    12 
       
    13   use Parallel::ForkManager;
       
    14 
       
    15   $pm = new Parallel::ForkManager($MAX_PROCESSES);
       
    16 
       
    17   foreach $data (@all_data) {
       
    18     # Forks and returns the pid for the child:
       
    19     my $pid = $pm->start and next; 
       
    20 
       
    21     ... do some work with $data in the child process ...
       
    22 
       
    23     $pm->finish; # Terminates the child process
       
    24   }
       
    25 
       
    26 =head1 DESCRIPTION
       
    27 
       
    28 This module is intended for use in operations that can be done in parallel 
       
    29 where the number of processes to be forked off should be limited. Typical 
       
    30 use is a downloader which will be retrieving hundreds/thousands of files.
       
    31 
       
    32 The code for a downloader would look something like this:
       
    33 
       
    34   use LWP::Simple;
       
    35   use Parallel::ForkManager;
       
    36 
       
    37   ...
       
    38   
       
    39   @links=( 
       
    40     ["http://www.foo.bar/rulez.data","rulez_data.txt"], 
       
    41     ["http://new.host/more_data.doc","more_data.doc"],
       
    42     ...
       
    43   );
       
    44 
       
    45   ...
       
    46 
       
    47   # Max 30 processes for parallel download
       
    48   my $pm = new Parallel::ForkManager(30); 
       
    49 
       
    50   foreach my $linkarray (@links) {
       
    51     $pm->start and next; # do the fork
       
    52 
       
    53     my ($link,$fn) = @$linkarray;
       
    54     warn "Cannot get $fn from $link"
       
    55       if getstore($link,$fn) != RC_OK;
       
    56 
       
    57     $pm->finish; # do the exit in the child process
       
    58   }
       
    59   $pm->wait_all_children;
       
    60 
       
    61 First you need to instantiate the ForkManager with the "new" constructor. 
       
    62 You must specify the maximum number of processes to be created. If you 
       
    63 specify 0, then NO fork will be done; this is good for debugging purposes.
       
    64 
       
    65 Next, use $pm->start to do the fork. $pm returns 0 for the child process, 
       
    66 and child pid for the parent process (see also L<perlfunc(1p)/fork()>). 
       
    67 The "and next" skips the internal loop in the parent process. NOTE: 
       
    68 $pm->start dies if the fork fails.
       
    69 
       
    70 $pm->finish terminates the child process (assuming a fork was done in the 
       
    71 "start").
       
    72 
       
    73 NOTE: You cannot use $pm->start if you are already in the child process. 
       
    74 If you want to manage another set of subprocesses in the child process, 
       
    75 you must instantiate another Parallel::ForkManager object!
       
    76 
       
    77 =head1 METHODS
       
    78 
       
    79 =over 5
       
    80 
       
    81 =item new $processes
       
    82 
       
    83 Instantiate a new Parallel::ForkManager object. You must specify the maximum 
       
    84 number of children to fork off. If you specify 0 (zero), then no children 
       
    85 will be forked. This is intended for debugging purposes.
       
    86 
       
    87 =item start [ $process_identifier ]
       
    88 
       
    89 This method does the fork. It returns the pid of the child process for 
       
    90 the parent, and 0 for the child process. If the $processes parameter 
       
    91 for the constructor is 0 then, assuming you're in the child process, 
       
    92 $pm->start simply returns 0.
       
    93 
       
    94 An optional $process_identifier can be provided to this method... It is used by 
       
    95 the "run_on_finish" callback (see CALLBACKS) for identifying the finished
       
    96 process.
       
    97 
       
    98 =item finish [ $exit_code ]
       
    99 
       
   100 Closes the child process by exiting and accepts an optional exit code 
       
   101 (default exit code is 0) which can be retrieved in the parent via callback. 
       
   102 If you use the program in debug mode ($processes == 0), this method doesn't 
       
   103 do anything.
       
   104 
       
   105 =item set_max_procs $processes
       
   106 
       
   107 Allows you to set a new maximum number of children to maintain. Returns 
       
   108 the previous setting.
       
   109 
       
   110 =item wait_all_children
       
   111 
       
   112 You can call this method to wait for all the processes which have been 
       
   113 forked. This is a blocking wait.
       
   114 
       
   115 =back
       
   116 
       
   117 =head1 CALLBACKS
       
   118 
       
   119 You can define callbacks in the code, which are called on events like starting 
       
   120 a process or upon finish.
       
   121 
       
   122 The callbacks can be defined with the following methods:
       
   123 
       
   124 =over 4
       
   125 
       
   126 =item run_on_finish $code [, $pid ]
       
   127 
       
   128 You can define a subroutine which is called when a child is terminated. It is
       
   129 called in the parent process.
       
   130 
       
   131 The paremeters of the $code are the following:
       
   132 
       
   133   - pid of the process, which is terminated
       
   134   - exit code of the program
       
   135   - identification of the process (if provided in the "start" method)
       
   136   - exit signal (0-127: signal name)
       
   137   - core dump (1 if there was core dump at exit)
       
   138 
       
   139 =item run_on_start $code
       
   140 
       
   141 You can define a subroutine which is called when a child is started. It called
       
   142 after the successful startup of a child in the parent process.
       
   143 
       
   144 The parameters of the $code are the following:
       
   145 
       
   146   - pid of the process which has been started
       
   147   - identification of the process (if provided in the "start" method)
       
   148 
       
   149 =item run_on_wait $code, [$period]
       
   150 
       
   151 You can define a subroutine which is called when the child process needs to wait
       
   152 for the startup. If $period is not defined, then one call is done per
       
   153 child. If $period is defined, then $code is called periodically and the
       
   154 module waits for $period seconds betwen the two calls. Note, $period can be
       
   155 fractional number also. The exact "$period seconds" is not guarranteed,
       
   156 signals can shorten and the process scheduler can make it longer (on busy
       
   157 systems).
       
   158 
       
   159 The $code called in the "start" and the "wait_all_children" method also.
       
   160 
       
   161 No parameters are passed to the $code on the call.
       
   162 
       
   163 =back
       
   164 
       
   165 =head1 EXAMPLE
       
   166 
       
   167 =head2 Parallel get
       
   168 
       
   169 This small example can be used to get URLs in parallel.
       
   170 
       
   171   use Parallel::ForkManager;
       
   172   use LWP::Simple;
       
   173   my $pm=new Parallel::ForkManager(10);
       
   174   for my $link (@ARGV) {
       
   175     $pm->start and next;
       
   176     my ($fn)= $link =~ /^.*\/(.*?)$/;
       
   177     if (!$fn) {
       
   178       warn "Cannot determine filename from $fn\n";
       
   179     } else {
       
   180       $0.=" ".$fn;
       
   181       print "Getting $fn from $link\n";
       
   182       my $rc=getstore($link,$fn);
       
   183       print "$link downloaded. response code: $rc\n";
       
   184     };
       
   185     $pm->finish;
       
   186   };
       
   187 
       
   188 =head2 Callbacks
       
   189 
       
   190 Example of a program using callbacks to get child exit codes:
       
   191 
       
   192   use strict;
       
   193   use Parallel::ForkManager;
       
   194 
       
   195   my $max_procs = 5;
       
   196   my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
       
   197   # hash to resolve PID's back to child specific information
       
   198 
       
   199   my $pm =  new Parallel::ForkManager($max_procs);
       
   200 
       
   201   # Setup a callback for when a child finishes up so we can
       
   202   # get it's exit code
       
   203   $pm->run_on_finish(
       
   204     sub { my ($pid, $exit_code, $ident) = @_;
       
   205       print "** $ident just got out of the pool ".
       
   206         "with PID $pid and exit code: $exit_code\n";
       
   207     }
       
   208   );
       
   209 
       
   210   $pm->run_on_start(
       
   211     sub { my ($pid,$ident)=@_;
       
   212       print "** $ident started, pid: $pid\n";
       
   213     }
       
   214   );
       
   215 
       
   216   $pm->run_on_wait(
       
   217     sub {
       
   218       print "** Have to wait for one children ...\n"
       
   219     },
       
   220     0.5
       
   221   );
       
   222 
       
   223   foreach my $child ( 0 .. $#names ) {
       
   224     my $pid = $pm->start($names[$child]) and next;
       
   225 
       
   226     # This code is the child process
       
   227     print "This is $names[$child], Child number $child\n";
       
   228     sleep ( 2 * $child );
       
   229     print "$names[$child], Child $child is about to get out...\n";
       
   230     sleep 1;
       
   231     $pm->finish($child); # pass an exit code to finish
       
   232   }
       
   233 
       
   234   print "Waiting for Children...\n";
       
   235   $pm->wait_all_children;
       
   236   print "Everybody is out of the pool!\n";
       
   237 
       
   238 =head1 BUGS AND LIMITATIONS
       
   239 
       
   240 Do not use Parallel::ForkManager in an environment, where other child
       
   241 processes can affect the run of the main program, so using this module
       
   242 is not recommended in an environment where fork() / wait() is already used.
       
   243 
       
   244 If you want to use more than one copies of the Parallel::ForkManager, then
       
   245 you have to make sure that all children processes are terminated, before you
       
   246 use the second object in the main program.
       
   247 
       
   248 You are free to use a new copy of Parallel::ForkManager in the child
       
   249 processes, although I don't think it makes sense.
       
   250 
       
   251 =head1 COPYRIGHT
       
   252 
       
   253 Copyright (c) 2000 Szabó, Balázs (dLux)
       
   254 
       
   255 All right reserved. This program is free software; you can redistribute it 
       
   256 and/or modify it under the same terms as Perl itself.
       
   257 
       
   258 =head1 AUTHOR
       
   259 
       
   260   dLux (Szabó, Balázs) <dlux@kapu.hu>
       
   261 
       
   262 =head1 CREDITS
       
   263 
       
   264   Noah Robin <sitz@onastick.net> (documentation tweaks)
       
   265   Chuck Hirstius <chirstius@megapathdsl.net> (callback exit status, example)
       
   266   Grant Hopwood <hopwoodg@valero.com> (win32 port)
       
   267   Mark Southern <mark_southern@merck.com> (bugfix)
       
   268 
       
   269 =cut
       
   270 
       
   271 package Parallel::ForkManager;
       
   272 use POSIX ":sys_wait_h";
       
   273 use strict;
       
   274 use vars qw($VERSION);
       
   275 $VERSION='0.7.5';
       
   276 
       
   277 sub new { my ($c,$processes)=@_;
       
   278   my $h={
       
   279     max_proc   => $processes,
       
   280     processes  => {},
       
   281     in_child   => 0,
       
   282   };
       
   283   return bless($h,ref($c)||$c);
       
   284 };
       
   285 
       
   286 sub start { my ($s,$identification)=@_;
       
   287   die "Cannot start another process while you are in the child process"
       
   288     if $s->{in_child};
       
   289   while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
       
   290     $s->on_wait;
       
   291     $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
       
   292   };
       
   293   $s->wait_children;
       
   294   if ($s->{max_proc}) {
       
   295     my $pid=fork();
       
   296     die "Cannot fork: $!" if !defined $pid;
       
   297     if ($pid) {
       
   298       $s->{processes}->{$pid}=$identification;
       
   299       $s->on_start($pid,$identification);
       
   300     } else {
       
   301       $s->{in_child}=1 if !$pid;
       
   302     }
       
   303     return $pid;
       
   304   } else {
       
   305     $s->{processes}->{$$}=$identification;
       
   306     $s->on_start($$,$identification);
       
   307     return 0; # Simulating the child which returns 0
       
   308   }
       
   309 }
       
   310 
       
   311 sub finish { my ($s, $x)=@_;
       
   312   if ( $s->{in_child} ) {
       
   313     exit ($x || 0);
       
   314   }
       
   315   if ($s->{max_proc} == 0) { # max_proc == 0
       
   316     $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0);
       
   317     delete $s->{processes}->{$$};
       
   318   }
       
   319   return 0;
       
   320 }
       
   321 
       
   322 sub wait_children { my ($s)=@_;
       
   323   return if !keys %{$s->{processes}};
       
   324   my $kid;
       
   325   do {
       
   326     $kid = $s->wait_one_child(&WNOHANG);
       
   327   } while $kid > 0 || $kid < -1; # AS 5.6/Win32 returns negative PIDs
       
   328 };
       
   329 
       
   330 *wait_childs=*wait_children; # compatibility
       
   331 
       
   332 sub wait_one_child { my ($s,$par)=@_;
       
   333   my $kid;
       
   334   while (1) {
       
   335     $kid = $s->_waitpid(-1,$par||=0);
       
   336     last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
       
   337     redo if !exists $s->{processes}->{$kid};
       
   338     my $id = delete $s->{processes}->{$kid};
       
   339     $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
       
   340     last;
       
   341   }
       
   342   $kid;
       
   343 };
       
   344 
       
   345 sub wait_all_children { my ($s)=@_;
       
   346   while (keys %{ $s->{processes} }) {
       
   347     $s->on_wait;
       
   348     $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
       
   349   };
       
   350 }
       
   351 
       
   352 *wait_all_childs=*wait_all_children; # compatibility;
       
   353 
       
   354 sub run_on_finish { my ($s,$code,$pid)=@_;
       
   355   $s->{on_finish}->{$pid || 0}=$code;
       
   356 }
       
   357 
       
   358 sub on_finish { my ($s,$pid,@par)=@_;
       
   359   my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
       
   360   $code->($pid,@par); 
       
   361 };
       
   362 
       
   363 sub run_on_wait { my ($s,$code, $period)=@_;
       
   364   $s->{on_wait}=$code;
       
   365   $s->{on_wait_period} = $period;
       
   366 }
       
   367 
       
   368 sub on_wait { my ($s)=@_;
       
   369   if(ref($s->{on_wait}) eq 'CODE') {
       
   370     $s->{on_wait}->();
       
   371     if (defined $s->{on_wait_period}) {
       
   372         local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
       
   373         select undef, undef, undef, $s->{on_wait_period}
       
   374     };
       
   375   };
       
   376 };
       
   377 
       
   378 sub run_on_start { my ($s,$code)=@_;
       
   379   $s->{on_start}=$code;
       
   380 }
       
   381 
       
   382 sub on_start { my ($s,@par)=@_;
       
   383   $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
       
   384 };
       
   385 
       
   386 sub set_max_procs { my ($s, $mp)=@_;
       
   387   $s->{max_proc} = $mp;
       
   388 }
       
   389 
       
   390 # OS dependant code follows...
       
   391 
       
   392 sub _waitpid { # Call waitpid() in the standard Unix fashion.
       
   393   return waitpid($_[1],$_[2]);
       
   394 }
       
   395 
       
   396 # On ActiveState Perl 5.6/Win32 build 625, waitpid(-1, &WNOHANG) always
       
   397 # blocks unless an actual PID other than -1 is given.
       
   398 sub _NT_waitpid { my ($s, $pid, $par) = @_;
       
   399   if ($par == &WNOHANG) { # Need to nonblock on each of our PIDs in the pool.
       
   400     my @pids = keys %{ $s->{processes} };
       
   401     # Simulate -1 (no processes awaiting cleanup.)
       
   402     return -1 unless scalar(@pids);
       
   403     # Check each PID in the pool.
       
   404     my $kid;
       
   405     foreach $pid (@pids) {
       
   406       $kid = waitpid($pid, $par);
       
   407       return $kid if $kid != 0; # AS 5.6/Win32 returns negative PIDs.
       
   408     }
       
   409     return $kid;
       
   410   } else { # Normal waitpid() call.
       
   411     return waitpid($pid, $par);
       
   412   }
       
   413 }
       
   414 
       
   415 {
       
   416   local $^W = 0;
       
   417   if ($^O eq 'NT' or $^O eq 'MSWin32') {
       
   418     *_waitpid = \&_NT_waitpid;
       
   419   }
       
   420 }
       
   421 
       
   422 1;