|
1 # Copyright (c) 2003-2009 Nokia Corporation and/or its subsidiary(-ies). |
|
2 # All rights reserved. |
|
3 # This component and the accompanying materials are made available |
|
4 # under the terms of "Eclipse Public License v1.0" |
|
5 # which accompanies this distribution, and is available |
|
6 # at the URL "http://www.eclipse.org/legal/epl-v10.html". |
|
7 # |
|
8 # Initial Contributors: |
|
9 # Nokia Corporation - initial contribution. |
|
10 # |
|
11 # Contributors: |
|
12 # |
|
13 # Description: |
|
14 # |
|
15 |
|
16 package Msg; |
|
17 use strict; |
|
18 use IO::Select; |
|
19 use IO::Socket; |
|
20 use Carp; |
|
21 |
|
22 use vars qw ( %scan_retrieves %publish_retrieves $scan_manages $publish_manages); |
|
23 |
|
24 %scan_retrieves = (); |
|
25 %publish_retrieves = (); |
|
26 $scan_manages = IO::Select->new(); |
|
27 $publish_manages = IO::Select->new(); |
|
28 my $obstructing_maintained = 0; |
|
29 |
|
30 my $AllAssociations = 0; |
|
31 |
|
32 |
|
33 BEGIN { |
|
34 # Checks if blocking is supported |
|
35 eval { |
|
36 require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN)); |
|
37 }; |
|
38 $obstructing_maintained = 1 unless $@; |
|
39 } |
|
40 |
|
41 use Socket qw(SO_KEEPALIVE SOL_SOCKET); |
|
42 use constant TCP_KEEPIDLE => 4; # Start keeplives after this period |
|
43 use constant TCP_KEEPINTVL => 5; # Interval between keepalives |
|
44 use constant TCP_KEEPCNT => 6; # Number of keepalives before death |
|
45 |
|
46 # AllAssociations |
|
47 # |
|
48 # Inputs |
|
49 # |
|
50 # Outputs |
|
51 # |
|
52 # Description |
|
53 # This function returns the total number of connections |
|
54 sub AllAssociations |
|
55 { |
|
56 return $AllAssociations; |
|
57 } |
|
58 |
|
59 # associate |
|
60 # |
|
61 # Inputs |
|
62 # $collection |
|
63 # $toReceiver (Host associate to) |
|
64 # $toChange (Port number to associate to) |
|
65 # $get_notice_process (Function to call on recieving data) |
|
66 # |
|
67 # Outputs |
|
68 # |
|
69 # Description |
|
70 # This function connects the client to the server |
|
71 sub associate { |
|
72 my ($collection, $toChange, $toReceiver, $get_notice_process) = @_; |
|
73 |
|
74 # Create a new internet socket |
|
75 |
|
76 my $link = IO::Socket::INET->new ( |
|
77 PeerAddr => $toReceiver, |
|
78 PeerPort => $toChange, |
|
79 Proto => 'tcp', |
|
80 TimeOut => 10, |
|
81 Reuse => 1); |
|
82 |
|
83 return undef unless $link; |
|
84 |
|
85 # Set KeepAlive |
|
86 setsockopt($link, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1)); |
|
87 setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPIDLE, pack("l", 30)); |
|
88 setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPCNT, pack("l", 2)); |
|
89 setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)); |
|
90 |
|
91 # Increse the total connection count |
|
92 $AllAssociations++; |
|
93 |
|
94 # Create a connection end-point object |
|
95 my $asso = bless { |
|
96 sock => $link, |
|
97 rcvd_notification_proc => $get_notice_process, |
|
98 }, $collection; |
|
99 |
|
100 # Set up the callback to the rcv function |
|
101 if ($get_notice_process) { |
|
102 my $retrieve = sub {_get($asso, 0)}; |
|
103 define_result_manager ($link, "read" => $retrieve); |
|
104 } |
|
105 $asso; |
|
106 } |
|
107 |
|
108 # unplug |
|
109 # |
|
110 # Inputs |
|
111 # $asso (Connection object) |
|
112 # |
|
113 # Outputs |
|
114 # |
|
115 # Description |
|
116 # This function disconnects a connection and cleans up |
|
117 sub unplug { |
|
118 my $asso = shift; |
|
119 |
|
120 # Decrease the number of total connections |
|
121 $AllAssociations--; |
|
122 |
|
123 # Delete the socket |
|
124 my $link = delete $asso->{sock}; |
|
125 return unless defined($link); |
|
126 # Set to not try and check for reads and writes of this socket |
|
127 define_result_manager ($link, "write" => undef, "read" => undef); |
|
128 close($link); |
|
129 } |
|
130 |
|
131 # transmit_immediately |
|
132 # |
|
133 # Inputs |
|
134 # $asso (Connection object) |
|
135 # $content (Message to send) |
|
136 # |
|
137 # Outputs |
|
138 # |
|
139 # Description |
|
140 # This function does a immediate send, this will block if the socket is not writeable |
|
141 sub transmit_immediately { |
|
142 my ($asso, $content) = @_; |
|
143 |
|
144 # Puts the message in the queue |
|
145 _lineup ($asso, $content); |
|
146 # Flushes the queue |
|
147 $asso->_transmit (1); # 1 ==> flush |
|
148 } |
|
149 |
|
150 # transmit_afterwards |
|
151 # |
|
152 # Inputs |
|
153 # $asso (Connection object) |
|
154 # $content (Message to send) |
|
155 # |
|
156 # Outputs |
|
157 # |
|
158 # Description |
|
159 # This function does a sends at a later time, does not block if the socket is not writeable. |
|
160 # It sets a callback to send the data in the queue when the socket is writeable |
|
161 sub transmit_afterwards { |
|
162 my ($asso, $content) = @_; |
|
163 |
|
164 # Puts the message in the queue |
|
165 _lineup($asso, $content); |
|
166 # Get the current socket |
|
167 my $link = $asso->{sock}; |
|
168 return unless defined($link); |
|
169 # Sets the callback to send the data when the socket is writeable |
|
170 define_result_manager ($link, "write" => sub {$asso->_transmit(0)}); |
|
171 } |
|
172 |
|
173 # _lineup |
|
174 # |
|
175 # Inputs |
|
176 # $asso (Connection object) |
|
177 # $content (Message to send) |
|
178 # |
|
179 # Outputs |
|
180 # |
|
181 # Description |
|
182 # This is a private function to place the message on the queue for this socket |
|
183 sub _lineup { |
|
184 my ($asso, $content) = @_; |
|
185 # prepend length (encoded as network long) |
|
186 my $dist = length($content); |
|
187 # Stores the length as a network long in the first 4 bytes of the message |
|
188 $content = pack ('N', $dist) . $content; |
|
189 push (@{$asso->{queue}}, $content); |
|
190 } |
|
191 |
|
192 # _transmit |
|
193 # |
|
194 # Inputs |
|
195 # $asso (Connection object) |
|
196 # $remove (Deferred Mode) |
|
197 # |
|
198 # Outputs |
|
199 # |
|
200 # Description |
|
201 # This is a private function sends the data |
|
202 sub _transmit { |
|
203 my ($asso, $remove) = @_; |
|
204 my $link = $asso->{sock}; |
|
205 return unless defined($link); |
|
206 my ($Lrq) = $asso->{queue}; |
|
207 |
|
208 # If $remove is set, set the socket to blocking, and send all |
|
209 # messages in the queue - return only if there's an error |
|
210 # If $remove is 0 (deferred mode) make the socket non-blocking, and |
|
211 # return to the event loop only after every message, or if it |
|
212 # is likely to block in the middle of a message. |
|
213 |
|
214 $remove ? $asso->define_obstructing() : $asso->define_not_obstructing(); |
|
215 my $branch = (exists $asso->{send_offset}) ? $asso->{send_offset} : 0; |
|
216 |
|
217 # Loop through the messages in the queue |
|
218 while (@$Lrq) { |
|
219 my $content = $Lrq->[0]; |
|
220 my $sequencetoPublish = length($content) - $branch; |
|
221 my $sequence_published = 0; |
|
222 while ($sequencetoPublish) { |
|
223 $sequence_published = syswrite ($link, $content, |
|
224 $sequencetoPublish, $branch); |
|
225 if (!defined($sequence_published)) { |
|
226 if (_faultwillObstruct($!)) { |
|
227 # Should happen only in deferred mode. Record how |
|
228 # much we have already sent. |
|
229 $asso->{send_offset} = $branch; |
|
230 # Event handler should already be set, so we will |
|
231 # be called back eventually, and will resume sending |
|
232 return 1; |
|
233 } else { # Uh, oh |
|
234 $asso->manage_transmitted_fault($!); |
|
235 return 0; # fail. Message remains in queue .. |
|
236 } |
|
237 } |
|
238 $branch += $sequence_published; |
|
239 $sequencetoPublish -= $sequence_published; |
|
240 } |
|
241 delete $asso->{send_offset}; |
|
242 $branch = 0; |
|
243 shift @$Lrq; |
|
244 last unless $remove; # Go back to select and wait |
|
245 # for it to fire again. |
|
246 } |
|
247 # Call me back if queue has not been drained. |
|
248 if (@$Lrq) { |
|
249 define_result_manager ($link, "write" => sub {$asso->_transmit(0)}); |
|
250 } else { |
|
251 define_result_manager ($link, "write" => undef); |
|
252 } |
|
253 1; # Success |
|
254 } |
|
255 |
|
256 # _faultwillObstruct |
|
257 # |
|
258 # Inputs |
|
259 # $asso (Connection object) |
|
260 # |
|
261 # Outputs |
|
262 # |
|
263 # Description |
|
264 # This is a private function processes the blocking error message |
|
265 sub _faultwillObstruct { |
|
266 if ($obstructing_maintained) { |
|
267 return ($_[0] == EAGAIN()); |
|
268 } |
|
269 return 0; |
|
270 } |
|
271 |
|
272 # define_not_obstructing |
|
273 # |
|
274 # Inputs |
|
275 # $_[0] (Connection socket) |
|
276 # |
|
277 # Outputs |
|
278 # |
|
279 # Description |
|
280 # This is a function set non-blocking on a socket |
|
281 sub define_not_obstructing { # $asso->define_obstructing |
|
282 if ($obstructing_maintained) { |
|
283 # preserve other fcntl flags |
|
284 my $pins = fcntl ($_[0], F_GETFL(), 0); |
|
285 fcntl ($_[0], F_SETFL(), $pins | O_NONBLOCK()); |
|
286 } |
|
287 } |
|
288 |
|
289 # define_obstructing |
|
290 # |
|
291 # Inputs |
|
292 # $_[0] (Connection socket) |
|
293 # |
|
294 # Outputs |
|
295 # |
|
296 # Description |
|
297 # This is a function set blocking on a socket |
|
298 sub define_obstructing { |
|
299 if ($obstructing_maintained) { |
|
300 my $pins = fcntl ($_[0], F_GETFL(), 0); |
|
301 $pins &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags |
|
302 fcntl ($_[0], F_SETFL(), $pins); |
|
303 } |
|
304 } |
|
305 |
|
306 # manage_transmitted_fault |
|
307 # |
|
308 # Inputs |
|
309 # $asso (Connection object) |
|
310 # $fault_content (Error message) |
|
311 # |
|
312 # Outputs |
|
313 # |
|
314 # Description |
|
315 # This is a function warns on send errors and removes the socket from list of writable sockets |
|
316 sub manage_transmitted_fault { |
|
317 # For more meaningful handling of send errors, subclass Msg and |
|
318 # rebless $asso. |
|
319 my ($asso, $fault_content) = @_; |
|
320 warn "Error while sending: $fault_content \n"; |
|
321 define_result_manager ($asso->{sock}, "write" => undef); |
|
322 } |
|
323 |
|
324 #----------------------------------------------------------------- |
|
325 # Receive side routines |
|
326 |
|
327 # recent_agent |
|
328 # |
|
329 # Inputs |
|
330 # $collection (Package) |
|
331 # $mi_receiver (Hostname of the interface to use) |
|
332 # $mi_change (Port number to listen on) |
|
333 # $enter_process (Reference to function to call when accepting a connection) |
|
334 # |
|
335 # Outputs |
|
336 # |
|
337 # Description |
|
338 # This is a function create a listening socket |
|
339 my ($g_enter_process,$g_collection); |
|
340 my $primary_plug = 0; |
|
341 sub recent_agent { |
|
342 @_ >= 4 || die "Msg->recent_agent (myhost, myport, login_proc)\n"; |
|
343 my ($RepeatNumber); |
|
344 my ($collection, $changes, $mi_receiver, $enter_process, $iAssociationBreak, $PlugAssociations) = @_; |
|
345 # Set a default Socket timeout value |
|
346 $iAssociationBreak = 0 if (!defined $iAssociationBreak); |
|
347 # Set a default Socket retry to be forever |
|
348 $PlugAssociations = -1 if (!defined $PlugAssociations); |
|
349 |
|
350 while(!$primary_plug) |
|
351 { |
|
352 #Check to see if there is a retry limit and if the limit has been reached |
|
353 if ($PlugAssociations != -1) |
|
354 { |
|
355 if (($RepeatNumber / scalar(@$changes)) >= $PlugAssociations) |
|
356 { |
|
357 die "ERROR: could not create socket after ".$RepeatNumber / scalar(@$changes)." attempts"; |
|
358 } else { |
|
359 # Increment the number of retries |
|
360 $RepeatNumber++; |
|
361 } |
|
362 } |
|
363 |
|
364 #Try the first port on the list |
|
365 my $mi_change = shift(@$changes); |
|
366 #Place the port on the back of the queue |
|
367 push @$changes,$mi_change; |
|
368 |
|
369 print "Using port number $mi_change\n"; |
|
370 $primary_plug = IO::Socket::INET->new ( |
|
371 LocalAddr => $mi_receiver, |
|
372 LocalPort => $mi_change, |
|
373 Listen => 5, |
|
374 Proto => 'tcp', |
|
375 TimeOut => 10, |
|
376 Reuse => 1); |
|
377 sleep $iAssociationBreak if (!$primary_plug); |
|
378 } |
|
379 |
|
380 # Set KeepAlive |
|
381 setsockopt($primary_plug, SOL_SOCKET, SO_KEEPALIVE, pack("l", 1)); |
|
382 setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPIDLE, pack("l", 30)); |
|
383 setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPCNT, pack("l", 2)); |
|
384 setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30)); |
|
385 |
|
386 # Add the socket to the list on filehandles to read from. |
|
387 define_result_manager ($primary_plug, "read" => \&_recent_node); |
|
388 # Store the package name and login proc for later use |
|
389 $g_enter_process = $enter_process; $g_collection = $collection; |
|
390 } |
|
391 |
|
392 sub get_immediately { |
|
393 my ($asso) = @_; |
|
394 my ($content, $fault) = _get ($asso, 1); # 1 ==> rcv now |
|
395 return wantarray ? ($content, $fault) : $content; |
|
396 } |
|
397 |
|
398 sub _get { # Complement to _transmit |
|
399 my ($asso, $get_immediately) = @_; # $get_immediately complement of $remove |
|
400 # Find out how much has already been received, if at all |
|
401 my ($content, $branch, $sequencetoScan, $sequence_scan); |
|
402 my $link = $asso->{sock}; |
|
403 return unless defined($link); |
|
404 if (exists $asso->{msg}) { |
|
405 $content = $asso->{msg}; |
|
406 $branch = length($content) - 1; # sysread appends to it. |
|
407 $sequencetoScan = $asso->{bytes_to_read}; |
|
408 delete $asso->{'msg'}; # have made a copy |
|
409 } else { |
|
410 # The typical case ... |
|
411 $content = ""; # Otherwise -w complains |
|
412 $branch = 0 ; |
|
413 $sequencetoScan = 0 ; # Will get set soon |
|
414 } |
|
415 # We want to read the message length in blocking mode. Quite |
|
416 # unlikely that we'll get blocked too long reading 4 bytes |
|
417 if (!$sequencetoScan) { # Get new length |
|
418 my $storage; |
|
419 $asso->define_obstructing(); |
|
420 $sequence_scan = sysread($link, $storage, 4); |
|
421 if ($! || ($sequence_scan != 4)) { |
|
422 goto FINISH; |
|
423 } |
|
424 $sequencetoScan = unpack ('N', $storage); |
|
425 } |
|
426 $asso->define_not_obstructing() unless $get_immediately; |
|
427 while ($sequencetoScan) { |
|
428 $sequence_scan = sysread ($link, $content, $sequencetoScan, $branch); |
|
429 if (defined ($sequence_scan)) { |
|
430 if ($sequence_scan == 0) { |
|
431 last; |
|
432 } |
|
433 $sequencetoScan -= $sequence_scan; |
|
434 $branch += $sequence_scan; |
|
435 } else { |
|
436 if (_faultwillObstruct($!)) { |
|
437 # Should come here only in non-blocking mode |
|
438 $asso->{msg} = $content; |
|
439 $asso->{bytes_to_read} = $sequencetoScan; |
|
440 return ; # .. _get will be called later |
|
441 # when socket is readable again |
|
442 } else { |
|
443 last; |
|
444 } |
|
445 } |
|
446 } |
|
447 |
|
448 FINISH: |
|
449 if (length($content) == 0) { |
|
450 $asso->unplug(); |
|
451 } |
|
452 if ($get_immediately) { |
|
453 return ($content, $!); |
|
454 } else { |
|
455 &{$asso->{rcvd_notification_proc}}($asso, $content, $!); |
|
456 } |
|
457 } |
|
458 |
|
459 sub _recent_node { |
|
460 my $link = $primary_plug->accept(); |
|
461 $AllAssociations++; |
|
462 my $asso = bless { |
|
463 'sock' => $link, |
|
464 'state' => 'connected' |
|
465 }, $g_collection; |
|
466 my $get_notice_process = |
|
467 &$g_enter_process ($asso, $link->peerhost(), $link->peerport()); |
|
468 if ($get_notice_process) { |
|
469 $asso->{rcvd_notification_proc} = $get_notice_process; |
|
470 my $retrieve = sub {_get($asso,0)}; |
|
471 define_result_manager ($link, "read" => $retrieve); |
|
472 } else { # Login failed |
|
473 $asso->unplug(); |
|
474 } |
|
475 } |
|
476 |
|
477 #---------------------------------------------------- |
|
478 # Event loop routines used by both client and server |
|
479 |
|
480 sub define_result_manager { |
|
481 shift unless ref($_[0]); # shift if first arg is package name |
|
482 my ($manage, %parameters) = @_; |
|
483 my $retrieve; |
|
484 if (exists $parameters{'write'}) { |
|
485 $retrieve = $parameters{'write'}; |
|
486 if ($retrieve) { |
|
487 $publish_retrieves{$manage} = $retrieve; |
|
488 $publish_manages->add($manage); |
|
489 } else { |
|
490 delete $publish_retrieves{$manage}; |
|
491 $publish_manages->remove($manage); |
|
492 } |
|
493 } |
|
494 if (exists $parameters{'read'}) { |
|
495 $retrieve = $parameters{'read'}; |
|
496 if ($retrieve) { |
|
497 $scan_retrieves{$manage} = $retrieve; |
|
498 $scan_manages->add($manage); |
|
499 } else { |
|
500 delete $scan_retrieves{$manage}; |
|
501 $scan_manages->remove($manage); |
|
502 } |
|
503 } |
|
504 } |
|
505 |
|
506 sub result_iteration { |
|
507 my ($collection, $starting_scan_break, $iteration_number) = @_; # result_iteration(1) to process events once |
|
508 my ($asso, $scan, $publish, $scandefine, $publishdefine); |
|
509 while (1) { |
|
510 # Quit the loop if no handles left to process |
|
511 last unless ($scan_manages->count() || $publish_manages->count()); |
|
512 if (defined $starting_scan_break) |
|
513 { |
|
514 ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, $starting_scan_break); |
|
515 # On initial timeout a read expect a read within timeout if not disconnect |
|
516 if (!defined $scandefine) |
|
517 { |
|
518 print "WARNING: no response from server within $starting_scan_break seconds\n"; |
|
519 last; |
|
520 } |
|
521 # Unset intial timeout |
|
522 $starting_scan_break = undef; |
|
523 } else { |
|
524 ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, undef); |
|
525 } |
|
526 foreach $scan (@$scandefine) { |
|
527 &{$scan_retrieves{$scan}} ($scan) if exists $scan_retrieves{$scan}; |
|
528 } |
|
529 foreach $publish (@$publishdefine) { |
|
530 &{$publish_retrieves{$publish}}($publish) if exists $publish_retrieves{$publish}; |
|
531 } |
|
532 if (defined($iteration_number)) { |
|
533 last unless --$iteration_number; |
|
534 } |
|
535 } |
|
536 } |
|
537 |
|
538 1; |
|
539 |
|
540 __END__ |
|
541 |