diff -r 22486c9c7b15 -r 378360dbbdba releasing/makecbr/Parallel/ForkManager.pm --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/releasing/makecbr/Parallel/ForkManager.pm Wed Jun 30 11:35:58 2010 +0800 @@ -0,0 +1,422 @@ +# Copyright (c) 2000 Szab? Balázs (dLux) +# +# All right reserved. This program is free software; you can redistribute it +# and/or modify it under the same terms as Perl itself. +# + +=head1 NAME + +Parallel::ForkManager - A simple parallel processing fork manager + +=head1 SYNOPSIS + + use Parallel::ForkManager; + + $pm = new Parallel::ForkManager($MAX_PROCESSES); + + foreach $data (@all_data) { + # Forks and returns the pid for the child: + my $pid = $pm->start and next; + + ... do some work with $data in the child process ... + + $pm->finish; # Terminates the child process + } + +=head1 DESCRIPTION + +This module is intended for use in operations that can be done in parallel +where the number of processes to be forked off should be limited. Typical +use is a downloader which will be retrieving hundreds/thousands of files. + +The code for a downloader would look something like this: + + use LWP::Simple; + use Parallel::ForkManager; + + ... + + @links=( + ["http://www.foo.bar/rulez.data","rulez_data.txt"], + ["http://new.host/more_data.doc","more_data.doc"], + ... + ); + + ... + + # Max 30 processes for parallel download + my $pm = new Parallel::ForkManager(30); + + foreach my $linkarray (@links) { + $pm->start and next; # do the fork + + my ($link,$fn) = @$linkarray; + warn "Cannot get $fn from $link" + if getstore($link,$fn) != RC_OK; + + $pm->finish; # do the exit in the child process + } + $pm->wait_all_children; + +First you need to instantiate the ForkManager with the "new" constructor. +You must specify the maximum number of processes to be created. If you +specify 0, then NO fork will be done; this is good for debugging purposes. + +Next, use $pm->start to do the fork. $pm returns 0 for the child process, +and child pid for the parent process (see also L). +The "and next" skips the internal loop in the parent process. NOTE: +$pm->start dies if the fork fails. + +$pm->finish terminates the child process (assuming a fork was done in the +"start"). + +NOTE: You cannot use $pm->start if you are already in the child process. +If you want to manage another set of subprocesses in the child process, +you must instantiate another Parallel::ForkManager object! + +=head1 METHODS + +=over 5 + +=item new $processes + +Instantiate a new Parallel::ForkManager object. You must specify the maximum +number of children to fork off. If you specify 0 (zero), then no children +will be forked. This is intended for debugging purposes. + +=item start [ $process_identifier ] + +This method does the fork. It returns the pid of the child process for +the parent, and 0 for the child process. If the $processes parameter +for the constructor is 0 then, assuming you're in the child process, +$pm->start simply returns 0. + +An optional $process_identifier can be provided to this method... It is used by +the "run_on_finish" callback (see CALLBACKS) for identifying the finished +process. + +=item finish [ $exit_code ] + +Closes the child process by exiting and accepts an optional exit code +(default exit code is 0) which can be retrieved in the parent via callback. +If you use the program in debug mode ($processes == 0), this method doesn't +do anything. + +=item set_max_procs $processes + +Allows you to set a new maximum number of children to maintain. Returns +the previous setting. + +=item wait_all_children + +You can call this method to wait for all the processes which have been +forked. This is a blocking wait. + +=back + +=head1 CALLBACKS + +You can define callbacks in the code, which are called on events like starting +a process or upon finish. + +The callbacks can be defined with the following methods: + +=over 4 + +=item run_on_finish $code [, $pid ] + +You can define a subroutine which is called when a child is terminated. It is +called in the parent process. + +The paremeters of the $code are the following: + + - pid of the process, which is terminated + - exit code of the program + - identification of the process (if provided in the "start" method) + - exit signal (0-127: signal name) + - core dump (1 if there was core dump at exit) + +=item run_on_start $code + +You can define a subroutine which is called when a child is started. It called +after the successful startup of a child in the parent process. + +The parameters of the $code are the following: + + - pid of the process which has been started + - identification of the process (if provided in the "start" method) + +=item run_on_wait $code, [$period] + +You can define a subroutine which is called when the child process needs to wait +for the startup. If $period is not defined, then one call is done per +child. If $period is defined, then $code is called periodically and the +module waits for $period seconds betwen the two calls. Note, $period can be +fractional number also. The exact "$period seconds" is not guarranteed, +signals can shorten and the process scheduler can make it longer (on busy +systems). + +The $code called in the "start" and the "wait_all_children" method also. + +No parameters are passed to the $code on the call. + +=back + +=head1 EXAMPLE + +=head2 Parallel get + +This small example can be used to get URLs in parallel. + + use Parallel::ForkManager; + use LWP::Simple; + my $pm=new Parallel::ForkManager(10); + for my $link (@ARGV) { + $pm->start and next; + my ($fn)= $link =~ /^.*\/(.*?)$/; + if (!$fn) { + warn "Cannot determine filename from $fn\n"; + } else { + $0.=" ".$fn; + print "Getting $fn from $link\n"; + my $rc=getstore($link,$fn); + print "$link downloaded. response code: $rc\n"; + }; + $pm->finish; + }; + +=head2 Callbacks + +Example of a program using callbacks to get child exit codes: + + use strict; + use Parallel::ForkManager; + + my $max_procs = 5; + my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara ); + # hash to resolve PID's back to child specific information + + my $pm = new Parallel::ForkManager($max_procs); + + # Setup a callback for when a child finishes up so we can + # get it's exit code + $pm->run_on_finish( + sub { my ($pid, $exit_code, $ident) = @_; + print "** $ident just got out of the pool ". + "with PID $pid and exit code: $exit_code\n"; + } + ); + + $pm->run_on_start( + sub { my ($pid,$ident)=@_; + print "** $ident started, pid: $pid\n"; + } + ); + + $pm->run_on_wait( + sub { + print "** Have to wait for one children ...\n" + }, + 0.5 + ); + + foreach my $child ( 0 .. $#names ) { + my $pid = $pm->start($names[$child]) and next; + + # This code is the child process + print "This is $names[$child], Child number $child\n"; + sleep ( 2 * $child ); + print "$names[$child], Child $child is about to get out...\n"; + sleep 1; + $pm->finish($child); # pass an exit code to finish + } + + print "Waiting for Children...\n"; + $pm->wait_all_children; + print "Everybody is out of the pool!\n"; + +=head1 BUGS AND LIMITATIONS + +Do not use Parallel::ForkManager in an environment, where other child +processes can affect the run of the main program, so using this module +is not recommended in an environment where fork() / wait() is already used. + +If you want to use more than one copies of the Parallel::ForkManager, then +you have to make sure that all children processes are terminated, before you +use the second object in the main program. + +You are free to use a new copy of Parallel::ForkManager in the child +processes, although I don't think it makes sense. + +=head1 COPYRIGHT + +Copyright (c) 2000 Szabó, Balázs (dLux) + +All right reserved. This program is free software; you can redistribute it +and/or modify it under the same terms as Perl itself. + +=head1 AUTHOR + + dLux (Szabó, Balázs) + +=head1 CREDITS + + Noah Robin (documentation tweaks) + Chuck Hirstius (callback exit status, example) + Grant Hopwood (win32 port) + Mark Southern (bugfix) + +=cut + +package Parallel::ForkManager; +use POSIX ":sys_wait_h"; +use strict; +use vars qw($VERSION); +$VERSION='0.7.5'; + +sub new { my ($c,$processes)=@_; + my $h={ + max_proc => $processes, + processes => {}, + in_child => 0, + }; + return bless($h,ref($c)||$c); +}; + +sub start { my ($s,$identification)=@_; + die "Cannot start another process while you are in the child process" + if $s->{in_child}; + while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) { + $s->on_wait; + $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef); + }; + $s->wait_children; + if ($s->{max_proc}) { + my $pid=fork(); + die "Cannot fork: $!" if !defined $pid; + if ($pid) { + $s->{processes}->{$pid}=$identification; + $s->on_start($pid,$identification); + } else { + $s->{in_child}=1 if !$pid; + } + return $pid; + } else { + $s->{processes}->{$$}=$identification; + $s->on_start($$,$identification); + return 0; # Simulating the child which returns 0 + } +} + +sub finish { my ($s, $x)=@_; + if ( $s->{in_child} ) { + exit ($x || 0); + } + if ($s->{max_proc} == 0) { # max_proc == 0 + $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0); + delete $s->{processes}->{$$}; + } + return 0; +} + +sub wait_children { my ($s)=@_; + return if !keys %{$s->{processes}}; + my $kid; + do { + $kid = $s->wait_one_child(&WNOHANG); + } while $kid > 0 || $kid < -1; # AS 5.6/Win32 returns negative PIDs +}; + +*wait_childs=*wait_children; # compatibility + +sub wait_one_child { my ($s,$par)=@_; + my $kid; + while (1) { + $kid = $s->_waitpid(-1,$par||=0); + last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs + redo if !exists $s->{processes}->{$kid}; + my $id = delete $s->{processes}->{$kid}; + $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0); + last; + } + $kid; +}; + +sub wait_all_children { my ($s)=@_; + while (keys %{ $s->{processes} }) { + $s->on_wait; + $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef); + }; +} + +*wait_all_childs=*wait_all_children; # compatibility; + +sub run_on_finish { my ($s,$code,$pid)=@_; + $s->{on_finish}->{$pid || 0}=$code; +} + +sub on_finish { my ($s,$pid,@par)=@_; + my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0; + $code->($pid,@par); +}; + +sub run_on_wait { my ($s,$code, $period)=@_; + $s->{on_wait}=$code; + $s->{on_wait_period} = $period; +} + +sub on_wait { my ($s)=@_; + if(ref($s->{on_wait}) eq 'CODE') { + $s->{on_wait}->(); + if (defined $s->{on_wait_period}) { + local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD}; + select undef, undef, undef, $s->{on_wait_period} + }; + }; +}; + +sub run_on_start { my ($s,$code)=@_; + $s->{on_start}=$code; +} + +sub on_start { my ($s,@par)=@_; + $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE'; +}; + +sub set_max_procs { my ($s, $mp)=@_; + $s->{max_proc} = $mp; +} + +# OS dependant code follows... + +sub _waitpid { # Call waitpid() in the standard Unix fashion. + return waitpid($_[1],$_[2]); +} + +# On ActiveState Perl 5.6/Win32 build 625, waitpid(-1, &WNOHANG) always +# blocks unless an actual PID other than -1 is given. +sub _NT_waitpid { my ($s, $pid, $par) = @_; + if ($par == &WNOHANG) { # Need to nonblock on each of our PIDs in the pool. + my @pids = keys %{ $s->{processes} }; + # Simulate -1 (no processes awaiting cleanup.) + return -1 unless scalar(@pids); + # Check each PID in the pool. + my $kid; + foreach $pid (@pids) { + $kid = waitpid($pid, $par); + return $kid if $kid != 0; # AS 5.6/Win32 returns negative PIDs. + } + return $kid; + } else { # Normal waitpid() call. + return waitpid($pid, $par); + } +} + +{ + local $^W = 0; + if ($^O eq 'NT' or $^O eq 'MSWin32') { + *_waitpid = \&_NT_waitpid; + } +} + +1;