releasing/makecbr/Parallel/ForkManager.pm
changeset 607 378360dbbdba
parent 602 3145852acc89
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/releasing/makecbr/Parallel/ForkManager.pm	Wed Jun 30 11:35:58 2010 +0800
@@ -0,0 +1,422 @@
+# Copyright (c) 2000 Szab? Balázs (dLux)
+#
+# All right reserved. This program is free software; you can redistribute it 
+# and/or modify it under the same terms as Perl itself.
+#
+
+=head1 NAME
+
+Parallel::ForkManager - A simple parallel processing fork manager
+
+=head1 SYNOPSIS
+
+  use Parallel::ForkManager;
+
+  $pm = new Parallel::ForkManager($MAX_PROCESSES);
+
+  foreach $data (@all_data) {
+    # Forks and returns the pid for the child:
+    my $pid = $pm->start and next; 
+
+    ... do some work with $data in the child process ...
+
+    $pm->finish; # Terminates the child process
+  }
+
+=head1 DESCRIPTION
+
+This module is intended for use in operations that can be done in parallel 
+where the number of processes to be forked off should be limited. Typical 
+use is a downloader which will be retrieving hundreds/thousands of files.
+
+The code for a downloader would look something like this:
+
+  use LWP::Simple;
+  use Parallel::ForkManager;
+
+  ...
+  
+  @links=( 
+    ["http://www.foo.bar/rulez.data","rulez_data.txt"], 
+    ["http://new.host/more_data.doc","more_data.doc"],
+    ...
+  );
+
+  ...
+
+  # Max 30 processes for parallel download
+  my $pm = new Parallel::ForkManager(30); 
+
+  foreach my $linkarray (@links) {
+    $pm->start and next; # do the fork
+
+    my ($link,$fn) = @$linkarray;
+    warn "Cannot get $fn from $link"
+      if getstore($link,$fn) != RC_OK;
+
+    $pm->finish; # do the exit in the child process
+  }
+  $pm->wait_all_children;
+
+First you need to instantiate the ForkManager with the "new" constructor. 
+You must specify the maximum number of processes to be created. If you 
+specify 0, then NO fork will be done; this is good for debugging purposes.
+
+Next, use $pm->start to do the fork. $pm returns 0 for the child process, 
+and child pid for the parent process (see also L<perlfunc(1p)/fork()>). 
+The "and next" skips the internal loop in the parent process. NOTE: 
+$pm->start dies if the fork fails.
+
+$pm->finish terminates the child process (assuming a fork was done in the 
+"start").
+
+NOTE: You cannot use $pm->start if you are already in the child process. 
+If you want to manage another set of subprocesses in the child process, 
+you must instantiate another Parallel::ForkManager object!
+
+=head1 METHODS
+
+=over 5
+
+=item new $processes
+
+Instantiate a new Parallel::ForkManager object. You must specify the maximum 
+number of children to fork off. If you specify 0 (zero), then no children 
+will be forked. This is intended for debugging purposes.
+
+=item start [ $process_identifier ]
+
+This method does the fork. It returns the pid of the child process for 
+the parent, and 0 for the child process. If the $processes parameter 
+for the constructor is 0 then, assuming you're in the child process, 
+$pm->start simply returns 0.
+
+An optional $process_identifier can be provided to this method... It is used by 
+the "run_on_finish" callback (see CALLBACKS) for identifying the finished
+process.
+
+=item finish [ $exit_code ]
+
+Closes the child process by exiting and accepts an optional exit code 
+(default exit code is 0) which can be retrieved in the parent via callback. 
+If you use the program in debug mode ($processes == 0), this method doesn't 
+do anything.
+
+=item set_max_procs $processes
+
+Allows you to set a new maximum number of children to maintain. Returns 
+the previous setting.
+
+=item wait_all_children
+
+You can call this method to wait for all the processes which have been 
+forked. This is a blocking wait.
+
+=back
+
+=head1 CALLBACKS
+
+You can define callbacks in the code, which are called on events like starting 
+a process or upon finish.
+
+The callbacks can be defined with the following methods:
+
+=over 4
+
+=item run_on_finish $code [, $pid ]
+
+You can define a subroutine which is called when a child is terminated. It is
+called in the parent process.
+
+The paremeters of the $code are the following:
+
+  - pid of the process, which is terminated
+  - exit code of the program
+  - identification of the process (if provided in the "start" method)
+  - exit signal (0-127: signal name)
+  - core dump (1 if there was core dump at exit)
+
+=item run_on_start $code
+
+You can define a subroutine which is called when a child is started. It called
+after the successful startup of a child in the parent process.
+
+The parameters of the $code are the following:
+
+  - pid of the process which has been started
+  - identification of the process (if provided in the "start" method)
+
+=item run_on_wait $code, [$period]
+
+You can define a subroutine which is called when the child process needs to wait
+for the startup. If $period is not defined, then one call is done per
+child. If $period is defined, then $code is called periodically and the
+module waits for $period seconds betwen the two calls. Note, $period can be
+fractional number also. The exact "$period seconds" is not guarranteed,
+signals can shorten and the process scheduler can make it longer (on busy
+systems).
+
+The $code called in the "start" and the "wait_all_children" method also.
+
+No parameters are passed to the $code on the call.
+
+=back
+
+=head1 EXAMPLE
+
+=head2 Parallel get
+
+This small example can be used to get URLs in parallel.
+
+  use Parallel::ForkManager;
+  use LWP::Simple;
+  my $pm=new Parallel::ForkManager(10);
+  for my $link (@ARGV) {
+    $pm->start and next;
+    my ($fn)= $link =~ /^.*\/(.*?)$/;
+    if (!$fn) {
+      warn "Cannot determine filename from $fn\n";
+    } else {
+      $0.=" ".$fn;
+      print "Getting $fn from $link\n";
+      my $rc=getstore($link,$fn);
+      print "$link downloaded. response code: $rc\n";
+    };
+    $pm->finish;
+  };
+
+=head2 Callbacks
+
+Example of a program using callbacks to get child exit codes:
+
+  use strict;
+  use Parallel::ForkManager;
+
+  my $max_procs = 5;
+  my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
+  # hash to resolve PID's back to child specific information
+
+  my $pm =  new Parallel::ForkManager($max_procs);
+
+  # Setup a callback for when a child finishes up so we can
+  # get it's exit code
+  $pm->run_on_finish(
+    sub { my ($pid, $exit_code, $ident) = @_;
+      print "** $ident just got out of the pool ".
+        "with PID $pid and exit code: $exit_code\n";
+    }
+  );
+
+  $pm->run_on_start(
+    sub { my ($pid,$ident)=@_;
+      print "** $ident started, pid: $pid\n";
+    }
+  );
+
+  $pm->run_on_wait(
+    sub {
+      print "** Have to wait for one children ...\n"
+    },
+    0.5
+  );
+
+  foreach my $child ( 0 .. $#names ) {
+    my $pid = $pm->start($names[$child]) and next;
+
+    # This code is the child process
+    print "This is $names[$child], Child number $child\n";
+    sleep ( 2 * $child );
+    print "$names[$child], Child $child is about to get out...\n";
+    sleep 1;
+    $pm->finish($child); # pass an exit code to finish
+  }
+
+  print "Waiting for Children...\n";
+  $pm->wait_all_children;
+  print "Everybody is out of the pool!\n";
+
+=head1 BUGS AND LIMITATIONS
+
+Do not use Parallel::ForkManager in an environment, where other child
+processes can affect the run of the main program, so using this module
+is not recommended in an environment where fork() / wait() is already used.
+
+If you want to use more than one copies of the Parallel::ForkManager, then
+you have to make sure that all children processes are terminated, before you
+use the second object in the main program.
+
+You are free to use a new copy of Parallel::ForkManager in the child
+processes, although I don't think it makes sense.
+
+=head1 COPYRIGHT
+
+Copyright (c) 2000 Szabó, Balázs (dLux)
+
+All right reserved. This program is free software; you can redistribute it 
+and/or modify it under the same terms as Perl itself.
+
+=head1 AUTHOR
+
+  dLux (Szabó, Balázs) <dlux@kapu.hu>
+
+=head1 CREDITS
+
+  Noah Robin <sitz@onastick.net> (documentation tweaks)
+  Chuck Hirstius <chirstius@megapathdsl.net> (callback exit status, example)
+  Grant Hopwood <hopwoodg@valero.com> (win32 port)
+  Mark Southern <mark_southern@merck.com> (bugfix)
+
+=cut
+
+package Parallel::ForkManager;
+use POSIX ":sys_wait_h";
+use strict;
+use vars qw($VERSION);
+$VERSION='0.7.5';
+
+sub new { my ($c,$processes)=@_;
+  my $h={
+    max_proc   => $processes,
+    processes  => {},
+    in_child   => 0,
+  };
+  return bless($h,ref($c)||$c);
+};
+
+sub start { my ($s,$identification)=@_;
+  die "Cannot start another process while you are in the child process"
+    if $s->{in_child};
+  while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
+    $s->on_wait;
+    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
+  };
+  $s->wait_children;
+  if ($s->{max_proc}) {
+    my $pid=fork();
+    die "Cannot fork: $!" if !defined $pid;
+    if ($pid) {
+      $s->{processes}->{$pid}=$identification;
+      $s->on_start($pid,$identification);
+    } else {
+      $s->{in_child}=1 if !$pid;
+    }
+    return $pid;
+  } else {
+    $s->{processes}->{$$}=$identification;
+    $s->on_start($$,$identification);
+    return 0; # Simulating the child which returns 0
+  }
+}
+
+sub finish { my ($s, $x)=@_;
+  if ( $s->{in_child} ) {
+    exit ($x || 0);
+  }
+  if ($s->{max_proc} == 0) { # max_proc == 0
+    $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0);
+    delete $s->{processes}->{$$};
+  }
+  return 0;
+}
+
+sub wait_children { my ($s)=@_;
+  return if !keys %{$s->{processes}};
+  my $kid;
+  do {
+    $kid = $s->wait_one_child(&WNOHANG);
+  } while $kid > 0 || $kid < -1; # AS 5.6/Win32 returns negative PIDs
+};
+
+*wait_childs=*wait_children; # compatibility
+
+sub wait_one_child { my ($s,$par)=@_;
+  my $kid;
+  while (1) {
+    $kid = $s->_waitpid(-1,$par||=0);
+    last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
+    redo if !exists $s->{processes}->{$kid};
+    my $id = delete $s->{processes}->{$kid};
+    $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
+    last;
+  }
+  $kid;
+};
+
+sub wait_all_children { my ($s)=@_;
+  while (keys %{ $s->{processes} }) {
+    $s->on_wait;
+    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
+  };
+}
+
+*wait_all_childs=*wait_all_children; # compatibility;
+
+sub run_on_finish { my ($s,$code,$pid)=@_;
+  $s->{on_finish}->{$pid || 0}=$code;
+}
+
+sub on_finish { my ($s,$pid,@par)=@_;
+  my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
+  $code->($pid,@par); 
+};
+
+sub run_on_wait { my ($s,$code, $period)=@_;
+  $s->{on_wait}=$code;
+  $s->{on_wait_period} = $period;
+}
+
+sub on_wait { my ($s)=@_;
+  if(ref($s->{on_wait}) eq 'CODE') {
+    $s->{on_wait}->();
+    if (defined $s->{on_wait_period}) {
+        local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
+        select undef, undef, undef, $s->{on_wait_period}
+    };
+  };
+};
+
+sub run_on_start { my ($s,$code)=@_;
+  $s->{on_start}=$code;
+}
+
+sub on_start { my ($s,@par)=@_;
+  $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
+};
+
+sub set_max_procs { my ($s, $mp)=@_;
+  $s->{max_proc} = $mp;
+}
+
+# OS dependant code follows...
+
+sub _waitpid { # Call waitpid() in the standard Unix fashion.
+  return waitpid($_[1],$_[2]);
+}
+
+# On ActiveState Perl 5.6/Win32 build 625, waitpid(-1, &WNOHANG) always
+# blocks unless an actual PID other than -1 is given.
+sub _NT_waitpid { my ($s, $pid, $par) = @_;
+  if ($par == &WNOHANG) { # Need to nonblock on each of our PIDs in the pool.
+    my @pids = keys %{ $s->{processes} };
+    # Simulate -1 (no processes awaiting cleanup.)
+    return -1 unless scalar(@pids);
+    # Check each PID in the pool.
+    my $kid;
+    foreach $pid (@pids) {
+      $kid = waitpid($pid, $par);
+      return $kid if $kid != 0; # AS 5.6/Win32 returns negative PIDs.
+    }
+    return $kid;
+  } else { # Normal waitpid() call.
+    return waitpid($pid, $par);
+  }
+}
+
+{
+  local $^W = 0;
+  if ($^O eq 'NT' or $^O eq 'MSWin32') {
+    *_waitpid = \&_NT_waitpid;
+  }
+}
+
+1;