602
|
1 |
# Copyright (c) 2000 Szab? Balázs (dLux)
|
|
2 |
#
|
|
3 |
# All right reserved. This program is free software; you can redistribute it
|
|
4 |
# and/or modify it under the same terms as Perl itself.
|
|
5 |
#
|
|
6 |
|
|
7 |
=head1 NAME
|
|
8 |
|
|
9 |
Parallel::ForkManager - A simple parallel processing fork manager
|
|
10 |
|
|
11 |
=head1 SYNOPSIS
|
|
12 |
|
|
13 |
use Parallel::ForkManager;
|
|
14 |
|
|
15 |
$pm = new Parallel::ForkManager($MAX_PROCESSES);
|
|
16 |
|
|
17 |
foreach $data (@all_data) {
|
|
18 |
# Forks and returns the pid for the child:
|
|
19 |
my $pid = $pm->start and next;
|
|
20 |
|
|
21 |
... do some work with $data in the child process ...
|
|
22 |
|
|
23 |
$pm->finish; # Terminates the child process
|
|
24 |
}
|
|
25 |
|
|
26 |
=head1 DESCRIPTION
|
|
27 |
|
|
28 |
This module is intended for use in operations that can be done in parallel
|
|
29 |
where the number of processes to be forked off should be limited. Typical
|
|
30 |
use is a downloader which will be retrieving hundreds/thousands of files.
|
|
31 |
|
|
32 |
The code for a downloader would look something like this:
|
|
33 |
|
|
34 |
use LWP::Simple;
|
|
35 |
use Parallel::ForkManager;
|
|
36 |
|
|
37 |
...
|
|
38 |
|
|
39 |
@links=(
|
|
40 |
["http://www.foo.bar/rulez.data","rulez_data.txt"],
|
|
41 |
["http://new.host/more_data.doc","more_data.doc"],
|
|
42 |
...
|
|
43 |
);
|
|
44 |
|
|
45 |
...
|
|
46 |
|
|
47 |
# Max 30 processes for parallel download
|
|
48 |
my $pm = new Parallel::ForkManager(30);
|
|
49 |
|
|
50 |
foreach my $linkarray (@links) {
|
|
51 |
$pm->start and next; # do the fork
|
|
52 |
|
|
53 |
my ($link,$fn) = @$linkarray;
|
|
54 |
warn "Cannot get $fn from $link"
|
|
55 |
if getstore($link,$fn) != RC_OK;
|
|
56 |
|
|
57 |
$pm->finish; # do the exit in the child process
|
|
58 |
}
|
|
59 |
$pm->wait_all_children;
|
|
60 |
|
|
61 |
First you need to instantiate the ForkManager with the "new" constructor.
|
|
62 |
You must specify the maximum number of processes to be created. If you
|
|
63 |
specify 0, then NO fork will be done; this is good for debugging purposes.
|
|
64 |
|
|
65 |
Next, use $pm->start to do the fork. $pm returns 0 for the child process,
|
|
66 |
and child pid for the parent process (see also L<perlfunc(1p)/fork()>).
|
|
67 |
The "and next" skips the internal loop in the parent process. NOTE:
|
|
68 |
$pm->start dies if the fork fails.
|
|
69 |
|
|
70 |
$pm->finish terminates the child process (assuming a fork was done in the
|
|
71 |
"start").
|
|
72 |
|
|
73 |
NOTE: You cannot use $pm->start if you are already in the child process.
|
|
74 |
If you want to manage another set of subprocesses in the child process,
|
|
75 |
you must instantiate another Parallel::ForkManager object!
|
|
76 |
|
|
77 |
=head1 METHODS
|
|
78 |
|
|
79 |
=over 5
|
|
80 |
|
|
81 |
=item new $processes
|
|
82 |
|
|
83 |
Instantiate a new Parallel::ForkManager object. You must specify the maximum
|
|
84 |
number of children to fork off. If you specify 0 (zero), then no children
|
|
85 |
will be forked. This is intended for debugging purposes.
|
|
86 |
|
|
87 |
=item start [ $process_identifier ]
|
|
88 |
|
|
89 |
This method does the fork. It returns the pid of the child process for
|
|
90 |
the parent, and 0 for the child process. If the $processes parameter
|
|
91 |
for the constructor is 0 then, assuming you're in the child process,
|
|
92 |
$pm->start simply returns 0.
|
|
93 |
|
|
94 |
An optional $process_identifier can be provided to this method... It is used by
|
|
95 |
the "run_on_finish" callback (see CALLBACKS) for identifying the finished
|
|
96 |
process.
|
|
97 |
|
|
98 |
=item finish [ $exit_code ]
|
|
99 |
|
|
100 |
Closes the child process by exiting and accepts an optional exit code
|
|
101 |
(default exit code is 0) which can be retrieved in the parent via callback.
|
|
102 |
If you use the program in debug mode ($processes == 0), this method doesn't
|
|
103 |
do anything.
|
|
104 |
|
|
105 |
=item set_max_procs $processes
|
|
106 |
|
|
107 |
Allows you to set a new maximum number of children to maintain. Returns
|
|
108 |
the previous setting.
|
|
109 |
|
|
110 |
=item wait_all_children
|
|
111 |
|
|
112 |
You can call this method to wait for all the processes which have been
|
|
113 |
forked. This is a blocking wait.
|
|
114 |
|
|
115 |
=back
|
|
116 |
|
|
117 |
=head1 CALLBACKS
|
|
118 |
|
|
119 |
You can define callbacks in the code, which are called on events like starting
|
|
120 |
a process or upon finish.
|
|
121 |
|
|
122 |
The callbacks can be defined with the following methods:
|
|
123 |
|
|
124 |
=over 4
|
|
125 |
|
|
126 |
=item run_on_finish $code [, $pid ]
|
|
127 |
|
|
128 |
You can define a subroutine which is called when a child is terminated. It is
|
|
129 |
called in the parent process.
|
|
130 |
|
|
131 |
The paremeters of the $code are the following:
|
|
132 |
|
|
133 |
- pid of the process, which is terminated
|
|
134 |
- exit code of the program
|
|
135 |
- identification of the process (if provided in the "start" method)
|
|
136 |
- exit signal (0-127: signal name)
|
|
137 |
- core dump (1 if there was core dump at exit)
|
|
138 |
|
|
139 |
=item run_on_start $code
|
|
140 |
|
|
141 |
You can define a subroutine which is called when a child is started. It called
|
|
142 |
after the successful startup of a child in the parent process.
|
|
143 |
|
|
144 |
The parameters of the $code are the following:
|
|
145 |
|
|
146 |
- pid of the process which has been started
|
|
147 |
- identification of the process (if provided in the "start" method)
|
|
148 |
|
|
149 |
=item run_on_wait $code, [$period]
|
|
150 |
|
|
151 |
You can define a subroutine which is called when the child process needs to wait
|
|
152 |
for the startup. If $period is not defined, then one call is done per
|
|
153 |
child. If $period is defined, then $code is called periodically and the
|
|
154 |
module waits for $period seconds betwen the two calls. Note, $period can be
|
|
155 |
fractional number also. The exact "$period seconds" is not guarranteed,
|
|
156 |
signals can shorten and the process scheduler can make it longer (on busy
|
|
157 |
systems).
|
|
158 |
|
|
159 |
The $code called in the "start" and the "wait_all_children" method also.
|
|
160 |
|
|
161 |
No parameters are passed to the $code on the call.
|
|
162 |
|
|
163 |
=back
|
|
164 |
|
|
165 |
=head1 EXAMPLE
|
|
166 |
|
|
167 |
=head2 Parallel get
|
|
168 |
|
|
169 |
This small example can be used to get URLs in parallel.
|
|
170 |
|
|
171 |
use Parallel::ForkManager;
|
|
172 |
use LWP::Simple;
|
|
173 |
my $pm=new Parallel::ForkManager(10);
|
|
174 |
for my $link (@ARGV) {
|
|
175 |
$pm->start and next;
|
|
176 |
my ($fn)= $link =~ /^.*\/(.*?)$/;
|
|
177 |
if (!$fn) {
|
|
178 |
warn "Cannot determine filename from $fn\n";
|
|
179 |
} else {
|
|
180 |
$0.=" ".$fn;
|
|
181 |
print "Getting $fn from $link\n";
|
|
182 |
my $rc=getstore($link,$fn);
|
|
183 |
print "$link downloaded. response code: $rc\n";
|
|
184 |
};
|
|
185 |
$pm->finish;
|
|
186 |
};
|
|
187 |
|
|
188 |
=head2 Callbacks
|
|
189 |
|
|
190 |
Example of a program using callbacks to get child exit codes:
|
|
191 |
|
|
192 |
use strict;
|
|
193 |
use Parallel::ForkManager;
|
|
194 |
|
|
195 |
my $max_procs = 5;
|
|
196 |
my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
|
|
197 |
# hash to resolve PID's back to child specific information
|
|
198 |
|
|
199 |
my $pm = new Parallel::ForkManager($max_procs);
|
|
200 |
|
|
201 |
# Setup a callback for when a child finishes up so we can
|
|
202 |
# get it's exit code
|
|
203 |
$pm->run_on_finish(
|
|
204 |
sub { my ($pid, $exit_code, $ident) = @_;
|
|
205 |
print "** $ident just got out of the pool ".
|
|
206 |
"with PID $pid and exit code: $exit_code\n";
|
|
207 |
}
|
|
208 |
);
|
|
209 |
|
|
210 |
$pm->run_on_start(
|
|
211 |
sub { my ($pid,$ident)=@_;
|
|
212 |
print "** $ident started, pid: $pid\n";
|
|
213 |
}
|
|
214 |
);
|
|
215 |
|
|
216 |
$pm->run_on_wait(
|
|
217 |
sub {
|
|
218 |
print "** Have to wait for one children ...\n"
|
|
219 |
},
|
|
220 |
0.5
|
|
221 |
);
|
|
222 |
|
|
223 |
foreach my $child ( 0 .. $#names ) {
|
|
224 |
my $pid = $pm->start($names[$child]) and next;
|
|
225 |
|
|
226 |
# This code is the child process
|
|
227 |
print "This is $names[$child], Child number $child\n";
|
|
228 |
sleep ( 2 * $child );
|
|
229 |
print "$names[$child], Child $child is about to get out...\n";
|
|
230 |
sleep 1;
|
|
231 |
$pm->finish($child); # pass an exit code to finish
|
|
232 |
}
|
|
233 |
|
|
234 |
print "Waiting for Children...\n";
|
|
235 |
$pm->wait_all_children;
|
|
236 |
print "Everybody is out of the pool!\n";
|
|
237 |
|
|
238 |
=head1 BUGS AND LIMITATIONS
|
|
239 |
|
|
240 |
Do not use Parallel::ForkManager in an environment, where other child
|
|
241 |
processes can affect the run of the main program, so using this module
|
|
242 |
is not recommended in an environment where fork() / wait() is already used.
|
|
243 |
|
|
244 |
If you want to use more than one copies of the Parallel::ForkManager, then
|
|
245 |
you have to make sure that all children processes are terminated, before you
|
|
246 |
use the second object in the main program.
|
|
247 |
|
|
248 |
You are free to use a new copy of Parallel::ForkManager in the child
|
|
249 |
processes, although I don't think it makes sense.
|
|
250 |
|
|
251 |
=head1 COPYRIGHT
|
|
252 |
|
|
253 |
Copyright (c) 2000 Szabó, Balázs (dLux)
|
|
254 |
|
|
255 |
All right reserved. This program is free software; you can redistribute it
|
|
256 |
and/or modify it under the same terms as Perl itself.
|
|
257 |
|
|
258 |
=head1 AUTHOR
|
|
259 |
|
|
260 |
dLux (Szabó, Balázs) <dlux@kapu.hu>
|
|
261 |
|
|
262 |
=head1 CREDITS
|
|
263 |
|
|
264 |
Noah Robin <sitz@onastick.net> (documentation tweaks)
|
|
265 |
Chuck Hirstius <chirstius@megapathdsl.net> (callback exit status, example)
|
|
266 |
Grant Hopwood <hopwoodg@valero.com> (win32 port)
|
|
267 |
Mark Southern <mark_southern@merck.com> (bugfix)
|
|
268 |
|
|
269 |
=cut
|
|
270 |
|
|
271 |
package Parallel::ForkManager;
|
|
272 |
use POSIX ":sys_wait_h";
|
|
273 |
use strict;
|
|
274 |
use vars qw($VERSION);
|
|
275 |
$VERSION='0.7.5';
|
|
276 |
|
|
277 |
sub new { my ($c,$processes)=@_;
|
|
278 |
my $h={
|
|
279 |
max_proc => $processes,
|
|
280 |
processes => {},
|
|
281 |
in_child => 0,
|
|
282 |
};
|
|
283 |
return bless($h,ref($c)||$c);
|
|
284 |
};
|
|
285 |
|
|
286 |
sub start { my ($s,$identification)=@_;
|
|
287 |
die "Cannot start another process while you are in the child process"
|
|
288 |
if $s->{in_child};
|
|
289 |
while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
|
|
290 |
$s->on_wait;
|
|
291 |
$s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
|
|
292 |
};
|
|
293 |
$s->wait_children;
|
|
294 |
if ($s->{max_proc}) {
|
|
295 |
my $pid=fork();
|
|
296 |
die "Cannot fork: $!" if !defined $pid;
|
|
297 |
if ($pid) {
|
|
298 |
$s->{processes}->{$pid}=$identification;
|
|
299 |
$s->on_start($pid,$identification);
|
|
300 |
} else {
|
|
301 |
$s->{in_child}=1 if !$pid;
|
|
302 |
}
|
|
303 |
return $pid;
|
|
304 |
} else {
|
|
305 |
$s->{processes}->{$$}=$identification;
|
|
306 |
$s->on_start($$,$identification);
|
|
307 |
return 0; # Simulating the child which returns 0
|
|
308 |
}
|
|
309 |
}
|
|
310 |
|
|
311 |
sub finish { my ($s, $x)=@_;
|
|
312 |
if ( $s->{in_child} ) {
|
|
313 |
exit ($x || 0);
|
|
314 |
}
|
|
315 |
if ($s->{max_proc} == 0) { # max_proc == 0
|
|
316 |
$s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0);
|
|
317 |
delete $s->{processes}->{$$};
|
|
318 |
}
|
|
319 |
return 0;
|
|
320 |
}
|
|
321 |
|
|
322 |
sub wait_children { my ($s)=@_;
|
|
323 |
return if !keys %{$s->{processes}};
|
|
324 |
my $kid;
|
|
325 |
do {
|
|
326 |
$kid = $s->wait_one_child(&WNOHANG);
|
|
327 |
} while $kid > 0 || $kid < -1; # AS 5.6/Win32 returns negative PIDs
|
|
328 |
};
|
|
329 |
|
|
330 |
*wait_childs=*wait_children; # compatibility
|
|
331 |
|
|
332 |
sub wait_one_child { my ($s,$par)=@_;
|
|
333 |
my $kid;
|
|
334 |
while (1) {
|
|
335 |
$kid = $s->_waitpid(-1,$par||=0);
|
|
336 |
last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
|
|
337 |
redo if !exists $s->{processes}->{$kid};
|
|
338 |
my $id = delete $s->{processes}->{$kid};
|
|
339 |
$s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
|
|
340 |
last;
|
|
341 |
}
|
|
342 |
$kid;
|
|
343 |
};
|
|
344 |
|
|
345 |
sub wait_all_children { my ($s)=@_;
|
|
346 |
while (keys %{ $s->{processes} }) {
|
|
347 |
$s->on_wait;
|
|
348 |
$s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
|
|
349 |
};
|
|
350 |
}
|
|
351 |
|
|
352 |
*wait_all_childs=*wait_all_children; # compatibility;
|
|
353 |
|
|
354 |
sub run_on_finish { my ($s,$code,$pid)=@_;
|
|
355 |
$s->{on_finish}->{$pid || 0}=$code;
|
|
356 |
}
|
|
357 |
|
|
358 |
sub on_finish { my ($s,$pid,@par)=@_;
|
|
359 |
my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
|
|
360 |
$code->($pid,@par);
|
|
361 |
};
|
|
362 |
|
|
363 |
sub run_on_wait { my ($s,$code, $period)=@_;
|
|
364 |
$s->{on_wait}=$code;
|
|
365 |
$s->{on_wait_period} = $period;
|
|
366 |
}
|
|
367 |
|
|
368 |
sub on_wait { my ($s)=@_;
|
|
369 |
if(ref($s->{on_wait}) eq 'CODE') {
|
|
370 |
$s->{on_wait}->();
|
|
371 |
if (defined $s->{on_wait_period}) {
|
|
372 |
local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
|
|
373 |
select undef, undef, undef, $s->{on_wait_period}
|
|
374 |
};
|
|
375 |
};
|
|
376 |
};
|
|
377 |
|
|
378 |
sub run_on_start { my ($s,$code)=@_;
|
|
379 |
$s->{on_start}=$code;
|
|
380 |
}
|
|
381 |
|
|
382 |
sub on_start { my ($s,@par)=@_;
|
|
383 |
$s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
|
|
384 |
};
|
|
385 |
|
|
386 |
sub set_max_procs { my ($s, $mp)=@_;
|
|
387 |
$s->{max_proc} = $mp;
|
|
388 |
}
|
|
389 |
|
|
390 |
# OS dependant code follows...
|
|
391 |
|
|
392 |
sub _waitpid { # Call waitpid() in the standard Unix fashion.
|
|
393 |
return waitpid($_[1],$_[2]);
|
|
394 |
}
|
|
395 |
|
|
396 |
# On ActiveState Perl 5.6/Win32 build 625, waitpid(-1, &WNOHANG) always
|
|
397 |
# blocks unless an actual PID other than -1 is given.
|
|
398 |
sub _NT_waitpid { my ($s, $pid, $par) = @_;
|
|
399 |
if ($par == &WNOHANG) { # Need to nonblock on each of our PIDs in the pool.
|
|
400 |
my @pids = keys %{ $s->{processes} };
|
|
401 |
# Simulate -1 (no processes awaiting cleanup.)
|
|
402 |
return -1 unless scalar(@pids);
|
|
403 |
# Check each PID in the pool.
|
|
404 |
my $kid;
|
|
405 |
foreach $pid (@pids) {
|
|
406 |
$kid = waitpid($pid, $par);
|
|
407 |
return $kid if $kid != 0; # AS 5.6/Win32 returns negative PIDs.
|
|
408 |
}
|
|
409 |
return $kid;
|
|
410 |
} else { # Normal waitpid() call.
|
|
411 |
return waitpid($pid, $par);
|
|
412 |
}
|
|
413 |
}
|
|
414 |
|
|
415 |
{
|
|
416 |
local $^W = 0;
|
|
417 |
if ($^O eq 'NT' or $^O eq 'MSWin32') {
|
|
418 |
*_waitpid = \&_NT_waitpid;
|
|
419 |
}
|
|
420 |
}
|
|
421 |
|
|
422 |
1;
|