|
1 # Copyright (c) 2000 Szab? Balázs (dLux) |
|
2 # |
|
3 # All right reserved. This program is free software; you can redistribute it |
|
4 # and/or modify it under the same terms as Perl itself. |
|
5 # |
|
6 |
|
7 =head1 NAME |
|
8 |
|
9 Parallel::ForkManager - A simple parallel processing fork manager |
|
10 |
|
11 =head1 SYNOPSIS |
|
12 |
|
13 use Parallel::ForkManager; |
|
14 |
|
15 $pm = new Parallel::ForkManager($MAX_PROCESSES); |
|
16 |
|
17 foreach $data (@all_data) { |
|
18 # Forks and returns the pid for the child: |
|
19 my $pid = $pm->start and next; |
|
20 |
|
21 ... do some work with $data in the child process ... |
|
22 |
|
23 $pm->finish; # Terminates the child process |
|
24 } |
|
25 |
|
26 =head1 DESCRIPTION |
|
27 |
|
28 This module is intended for use in operations that can be done in parallel |
|
29 where the number of processes to be forked off should be limited. Typical |
|
30 use is a downloader which will be retrieving hundreds/thousands of files. |
|
31 |
|
32 The code for a downloader would look something like this: |
|
33 |
|
34 use LWP::Simple; |
|
35 use Parallel::ForkManager; |
|
36 |
|
37 ... |
|
38 |
|
39 @links=( |
|
40 ["http://www.foo.bar/rulez.data","rulez_data.txt"], |
|
41 ["http://new.host/more_data.doc","more_data.doc"], |
|
42 ... |
|
43 ); |
|
44 |
|
45 ... |
|
46 |
|
47 # Max 30 processes for parallel download |
|
48 my $pm = new Parallel::ForkManager(30); |
|
49 |
|
50 foreach my $linkarray (@links) { |
|
51 $pm->start and next; # do the fork |
|
52 |
|
53 my ($link,$fn) = @$linkarray; |
|
54 warn "Cannot get $fn from $link" |
|
55 if getstore($link,$fn) != RC_OK; |
|
56 |
|
57 $pm->finish; # do the exit in the child process |
|
58 } |
|
59 $pm->wait_all_children; |
|
60 |
|
61 First you need to instantiate the ForkManager with the "new" constructor. |
|
62 You must specify the maximum number of processes to be created. If you |
|
63 specify 0, then NO fork will be done; this is good for debugging purposes. |
|
64 |
|
65 Next, use $pm->start to do the fork. $pm returns 0 for the child process, |
|
66 and child pid for the parent process (see also L<perlfunc(1p)/fork()>). |
|
67 The "and next" skips the internal loop in the parent process. NOTE: |
|
68 $pm->start dies if the fork fails. |
|
69 |
|
70 $pm->finish terminates the child process (assuming a fork was done in the |
|
71 "start"). |
|
72 |
|
73 NOTE: You cannot use $pm->start if you are already in the child process. |
|
74 If you want to manage another set of subprocesses in the child process, |
|
75 you must instantiate another Parallel::ForkManager object! |
|
76 |
|
77 =head1 METHODS |
|
78 |
|
79 =over 5 |
|
80 |
|
81 =item new $processes |
|
82 |
|
83 Instantiate a new Parallel::ForkManager object. You must specify the maximum |
|
84 number of children to fork off. If you specify 0 (zero), then no children |
|
85 will be forked. This is intended for debugging purposes. |
|
86 |
|
87 =item start [ $process_identifier ] |
|
88 |
|
89 This method does the fork. It returns the pid of the child process for |
|
90 the parent, and 0 for the child process. If the $processes parameter |
|
91 for the constructor is 0 then, assuming you're in the child process, |
|
92 $pm->start simply returns 0. |
|
93 |
|
94 An optional $process_identifier can be provided to this method... It is used by |
|
95 the "run_on_finish" callback (see CALLBACKS) for identifying the finished |
|
96 process. |
|
97 |
|
98 =item finish [ $exit_code ] |
|
99 |
|
100 Closes the child process by exiting and accepts an optional exit code |
|
101 (default exit code is 0) which can be retrieved in the parent via callback. |
|
102 If you use the program in debug mode ($processes == 0), this method doesn't |
|
103 do anything. |
|
104 |
|
105 =item set_max_procs $processes |
|
106 |
|
107 Allows you to set a new maximum number of children to maintain. Returns |
|
108 the previous setting. |
|
109 |
|
110 =item wait_all_children |
|
111 |
|
112 You can call this method to wait for all the processes which have been |
|
113 forked. This is a blocking wait. |
|
114 |
|
115 =back |
|
116 |
|
117 =head1 CALLBACKS |
|
118 |
|
119 You can define callbacks in the code, which are called on events like starting |
|
120 a process or upon finish. |
|
121 |
|
122 The callbacks can be defined with the following methods: |
|
123 |
|
124 =over 4 |
|
125 |
|
126 =item run_on_finish $code [, $pid ] |
|
127 |
|
128 You can define a subroutine which is called when a child is terminated. It is |
|
129 called in the parent process. |
|
130 |
|
131 The paremeters of the $code are the following: |
|
132 |
|
133 - pid of the process, which is terminated |
|
134 - exit code of the program |
|
135 - identification of the process (if provided in the "start" method) |
|
136 - exit signal (0-127: signal name) |
|
137 - core dump (1 if there was core dump at exit) |
|
138 |
|
139 =item run_on_start $code |
|
140 |
|
141 You can define a subroutine which is called when a child is started. It called |
|
142 after the successful startup of a child in the parent process. |
|
143 |
|
144 The parameters of the $code are the following: |
|
145 |
|
146 - pid of the process which has been started |
|
147 - identification of the process (if provided in the "start" method) |
|
148 |
|
149 =item run_on_wait $code, [$period] |
|
150 |
|
151 You can define a subroutine which is called when the child process needs to wait |
|
152 for the startup. If $period is not defined, then one call is done per |
|
153 child. If $period is defined, then $code is called periodically and the |
|
154 module waits for $period seconds betwen the two calls. Note, $period can be |
|
155 fractional number also. The exact "$period seconds" is not guarranteed, |
|
156 signals can shorten and the process scheduler can make it longer (on busy |
|
157 systems). |
|
158 |
|
159 The $code called in the "start" and the "wait_all_children" method also. |
|
160 |
|
161 No parameters are passed to the $code on the call. |
|
162 |
|
163 =back |
|
164 |
|
165 =head1 EXAMPLE |
|
166 |
|
167 =head2 Parallel get |
|
168 |
|
169 This small example can be used to get URLs in parallel. |
|
170 |
|
171 use Parallel::ForkManager; |
|
172 use LWP::Simple; |
|
173 my $pm=new Parallel::ForkManager(10); |
|
174 for my $link (@ARGV) { |
|
175 $pm->start and next; |
|
176 my ($fn)= $link =~ /^.*\/(.*?)$/; |
|
177 if (!$fn) { |
|
178 warn "Cannot determine filename from $fn\n"; |
|
179 } else { |
|
180 $0.=" ".$fn; |
|
181 print "Getting $fn from $link\n"; |
|
182 my $rc=getstore($link,$fn); |
|
183 print "$link downloaded. response code: $rc\n"; |
|
184 }; |
|
185 $pm->finish; |
|
186 }; |
|
187 |
|
188 =head2 Callbacks |
|
189 |
|
190 Example of a program using callbacks to get child exit codes: |
|
191 |
|
192 use strict; |
|
193 use Parallel::ForkManager; |
|
194 |
|
195 my $max_procs = 5; |
|
196 my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara ); |
|
197 # hash to resolve PID's back to child specific information |
|
198 |
|
199 my $pm = new Parallel::ForkManager($max_procs); |
|
200 |
|
201 # Setup a callback for when a child finishes up so we can |
|
202 # get it's exit code |
|
203 $pm->run_on_finish( |
|
204 sub { my ($pid, $exit_code, $ident) = @_; |
|
205 print "** $ident just got out of the pool ". |
|
206 "with PID $pid and exit code: $exit_code\n"; |
|
207 } |
|
208 ); |
|
209 |
|
210 $pm->run_on_start( |
|
211 sub { my ($pid,$ident)=@_; |
|
212 print "** $ident started, pid: $pid\n"; |
|
213 } |
|
214 ); |
|
215 |
|
216 $pm->run_on_wait( |
|
217 sub { |
|
218 print "** Have to wait for one children ...\n" |
|
219 }, |
|
220 0.5 |
|
221 ); |
|
222 |
|
223 foreach my $child ( 0 .. $#names ) { |
|
224 my $pid = $pm->start($names[$child]) and next; |
|
225 |
|
226 # This code is the child process |
|
227 print "This is $names[$child], Child number $child\n"; |
|
228 sleep ( 2 * $child ); |
|
229 print "$names[$child], Child $child is about to get out...\n"; |
|
230 sleep 1; |
|
231 $pm->finish($child); # pass an exit code to finish |
|
232 } |
|
233 |
|
234 print "Waiting for Children...\n"; |
|
235 $pm->wait_all_children; |
|
236 print "Everybody is out of the pool!\n"; |
|
237 |
|
238 =head1 BUGS AND LIMITATIONS |
|
239 |
|
240 Do not use Parallel::ForkManager in an environment, where other child |
|
241 processes can affect the run of the main program, so using this module |
|
242 is not recommended in an environment where fork() / wait() is already used. |
|
243 |
|
244 If you want to use more than one copies of the Parallel::ForkManager, then |
|
245 you have to make sure that all children processes are terminated, before you |
|
246 use the second object in the main program. |
|
247 |
|
248 You are free to use a new copy of Parallel::ForkManager in the child |
|
249 processes, although I don't think it makes sense. |
|
250 |
|
251 =head1 COPYRIGHT |
|
252 |
|
253 Copyright (c) 2000 Szabó, Balázs (dLux) |
|
254 |
|
255 All right reserved. This program is free software; you can redistribute it |
|
256 and/or modify it under the same terms as Perl itself. |
|
257 |
|
258 =head1 AUTHOR |
|
259 |
|
260 dLux (Szabó, Balázs) <dlux@kapu.hu> |
|
261 |
|
262 =head1 CREDITS |
|
263 |
|
264 Noah Robin <sitz@onastick.net> (documentation tweaks) |
|
265 Chuck Hirstius <chirstius@megapathdsl.net> (callback exit status, example) |
|
266 Grant Hopwood <hopwoodg@valero.com> (win32 port) |
|
267 Mark Southern <mark_southern@merck.com> (bugfix) |
|
268 |
|
269 =cut |
|
270 |
|
271 package Parallel::ForkManager; |
|
272 use POSIX ":sys_wait_h"; |
|
273 use strict; |
|
274 use vars qw($VERSION); |
|
275 $VERSION='0.7.5'; |
|
276 |
|
277 sub new { my ($c,$processes)=@_; |
|
278 my $h={ |
|
279 max_proc => $processes, |
|
280 processes => {}, |
|
281 in_child => 0, |
|
282 }; |
|
283 return bless($h,ref($c)||$c); |
|
284 }; |
|
285 |
|
286 sub start { my ($s,$identification)=@_; |
|
287 die "Cannot start another process while you are in the child process" |
|
288 if $s->{in_child}; |
|
289 while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) { |
|
290 $s->on_wait; |
|
291 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef); |
|
292 }; |
|
293 $s->wait_children; |
|
294 if ($s->{max_proc}) { |
|
295 my $pid=fork(); |
|
296 die "Cannot fork: $!" if !defined $pid; |
|
297 if ($pid) { |
|
298 $s->{processes}->{$pid}=$identification; |
|
299 $s->on_start($pid,$identification); |
|
300 } else { |
|
301 $s->{in_child}=1 if !$pid; |
|
302 } |
|
303 return $pid; |
|
304 } else { |
|
305 $s->{processes}->{$$}=$identification; |
|
306 $s->on_start($$,$identification); |
|
307 return 0; # Simulating the child which returns 0 |
|
308 } |
|
309 } |
|
310 |
|
311 sub finish { my ($s, $x)=@_; |
|
312 if ( $s->{in_child} ) { |
|
313 exit ($x || 0); |
|
314 } |
|
315 if ($s->{max_proc} == 0) { # max_proc == 0 |
|
316 $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0); |
|
317 delete $s->{processes}->{$$}; |
|
318 } |
|
319 return 0; |
|
320 } |
|
321 |
|
322 sub wait_children { my ($s)=@_; |
|
323 return if !keys %{$s->{processes}}; |
|
324 my $kid; |
|
325 do { |
|
326 $kid = $s->wait_one_child(&WNOHANG); |
|
327 } while $kid > 0 || $kid < -1; # AS 5.6/Win32 returns negative PIDs |
|
328 }; |
|
329 |
|
330 *wait_childs=*wait_children; # compatibility |
|
331 |
|
332 sub wait_one_child { my ($s,$par)=@_; |
|
333 my $kid; |
|
334 while (1) { |
|
335 $kid = $s->_waitpid(-1,$par||=0); |
|
336 last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs |
|
337 redo if !exists $s->{processes}->{$kid}; |
|
338 my $id = delete $s->{processes}->{$kid}; |
|
339 $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0); |
|
340 last; |
|
341 } |
|
342 $kid; |
|
343 }; |
|
344 |
|
345 sub wait_all_children { my ($s)=@_; |
|
346 while (keys %{ $s->{processes} }) { |
|
347 $s->on_wait; |
|
348 $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef); |
|
349 }; |
|
350 } |
|
351 |
|
352 *wait_all_childs=*wait_all_children; # compatibility; |
|
353 |
|
354 sub run_on_finish { my ($s,$code,$pid)=@_; |
|
355 $s->{on_finish}->{$pid || 0}=$code; |
|
356 } |
|
357 |
|
358 sub on_finish { my ($s,$pid,@par)=@_; |
|
359 my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0; |
|
360 $code->($pid,@par); |
|
361 }; |
|
362 |
|
363 sub run_on_wait { my ($s,$code, $period)=@_; |
|
364 $s->{on_wait}=$code; |
|
365 $s->{on_wait_period} = $period; |
|
366 } |
|
367 |
|
368 sub on_wait { my ($s)=@_; |
|
369 if(ref($s->{on_wait}) eq 'CODE') { |
|
370 $s->{on_wait}->(); |
|
371 if (defined $s->{on_wait_period}) { |
|
372 local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD}; |
|
373 select undef, undef, undef, $s->{on_wait_period} |
|
374 }; |
|
375 }; |
|
376 }; |
|
377 |
|
378 sub run_on_start { my ($s,$code)=@_; |
|
379 $s->{on_start}=$code; |
|
380 } |
|
381 |
|
382 sub on_start { my ($s,@par)=@_; |
|
383 $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE'; |
|
384 }; |
|
385 |
|
386 sub set_max_procs { my ($s, $mp)=@_; |
|
387 $s->{max_proc} = $mp; |
|
388 } |
|
389 |
|
390 # OS dependant code follows... |
|
391 |
|
392 sub _waitpid { # Call waitpid() in the standard Unix fashion. |
|
393 return waitpid($_[1],$_[2]); |
|
394 } |
|
395 |
|
396 # On ActiveState Perl 5.6/Win32 build 625, waitpid(-1, &WNOHANG) always |
|
397 # blocks unless an actual PID other than -1 is given. |
|
398 sub _NT_waitpid { my ($s, $pid, $par) = @_; |
|
399 if ($par == &WNOHANG) { # Need to nonblock on each of our PIDs in the pool. |
|
400 my @pids = keys %{ $s->{processes} }; |
|
401 # Simulate -1 (no processes awaiting cleanup.) |
|
402 return -1 unless scalar(@pids); |
|
403 # Check each PID in the pool. |
|
404 my $kid; |
|
405 foreach $pid (@pids) { |
|
406 $kid = waitpid($pid, $par); |
|
407 return $kid if $kid != 0; # AS 5.6/Win32 returns negative PIDs. |
|
408 } |
|
409 return $kid; |
|
410 } else { # Normal waitpid() call. |
|
411 return waitpid($pid, $par); |
|
412 } |
|
413 } |
|
414 |
|
415 { |
|
416 local $^W = 0; |
|
417 if ($^O eq 'NT' or $^O eq 'MSWin32') { |
|
418 *_waitpid = \&_NT_waitpid; |
|
419 } |
|
420 } |
|
421 |
|
422 1; |