changeset 662 60be34e1b006
parent 655 3f65fd25dfd4
equal deleted inserted replaced
654:7c11c3d8d025 662:60be34e1b006
     1 # Copyright (c) 2003-2009 Nokia Corporation and/or its subsidiary(-ies).
     2 # All rights reserved.
     3 # This component and the accompanying materials are made available
     4 # under the terms of "Eclipse Public License v1.0"
     5 # which accompanies this distribution, and is available
     6 # at the URL "".
     7 #
     8 # Initial Contributors:
     9 # Nokia Corporation - initial contribution.
    10 #
    11 # Contributors:
    12 #
    13 # Description:
    14 #
    16 package Msg;
    17 use strict;
    18 use IO::Select;
    19 use IO::Socket;
    20 use Carp;
    22 use vars qw ( %scan_retrieves %publish_retrieves $scan_manages $publish_manages);
    24  %scan_retrieves = ();
    25 %publish_retrieves = ();
    26 $scan_manages   = IO::Select->new();
    27 $publish_manages   = IO::Select->new();
    28 my $obstructing_maintained = 0;
    30 my $AllAssociations = 0;
    33 BEGIN {
    34     # Checks if blocking is supported
    35     eval {
    36         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
    37     };
    38     $obstructing_maintained = 1 unless $@;
    39 }
    41 use Socket qw(SO_KEEPALIVE SOL_SOCKET);
    42 use constant TCP_KEEPIDLE  => 4; # Start keeplives after this period
    43 use constant TCP_KEEPINTVL => 5; # Interval between keepalives
    44 use constant TCP_KEEPCNT   => 6; # Number of keepalives before death
    46 # AllAssociations
    47 #
    48 # Inputs
    49 #
    50 # Outputs
    51 #
    52 # Description
    53 # This function returns the total number of connections
    54 sub AllAssociations
    55 {
    56   return $AllAssociations;
    57 }
    59 # associate
    60 #
    61 # Inputs
    62 # $collection
    63 # $toReceiver (Host associate to)
    64 # $toChange (Port number to associate to)
    65 # $get_notice_process (Function to call on recieving data)
    66 #
    67 # Outputs
    68 #
    69 # Description
    70 # This function connects the client to the server
    71 sub associate {
    72     my ($collection, $toChange, $toReceiver, $get_notice_process) = @_;
    74     # Create a new internet socket
    76     my $link = IO::Socket::INET->new (
    77                                       PeerAddr => $toReceiver,
    78                                       PeerPort => $toChange,
    79                                       Proto    => 'tcp',
    80                                       TimeOut => 10,
    81                                       Reuse    => 1);
    83     return undef unless $link;
    85     # Set KeepAlive
    86     setsockopt($link, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
    87     setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
    88     setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
    89     setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
    91     # Increse the total connection count
    92     $AllAssociations++;
    94     # Create a connection end-point object
    95     my $asso = bless {
    96         sock                   => $link,
    97         rcvd_notification_proc => $get_notice_process,
    98     }, $collection;
   100       # Set up the callback to the rcv function
   101     if ($get_notice_process) {
   102         my $retrieve = sub {_get($asso, 0)};
   103         define_result_manager ($link, "read" => $retrieve);
   104     }
   105     $asso;
   106 }
   108 # unplug
   109 #
   110 # Inputs
   111 # $asso (Connection object)
   112 #
   113 # Outputs
   114 #
   115 # Description
   116 # This function disconnects a connection and cleans up
   117 sub unplug {
   118     my $asso = shift;
   120     # Decrease the number of total connections
   121     $AllAssociations--;
   123     # Delete the socket
   124     my $link = delete $asso->{sock};
   125     return unless defined($link);
   126     # Set to not try and check for reads and writes of this socket
   127     define_result_manager ($link, "write" => undef, "read" => undef);
   128     close($link);
   129 }
   131 # transmit_immediately
   132 #
   133 # Inputs
   134 # $asso (Connection object)
   135 # $content (Message to send)
   136 #
   137 # Outputs
   138 #
   139 # Description
   140 # This function does a immediate send, this will block if the socket is not writeable
   141 sub transmit_immediately {
   142     my ($asso, $content) = @_;
   144     # Puts the message in the queue
   145     _lineup ($asso, $content);
   146     # Flushes the queue
   147     $asso->_transmit (1); # 1 ==> flush
   148 }
   150 # transmit_afterwards
   151 #
   152 # Inputs
   153 # $asso (Connection object)
   154 # $content (Message to send)
   155 #
   156 # Outputs
   157 #
   158 # Description
   159 # This function does a sends at a later time, does not block if the socket is not writeable.
   160 # It sets a callback to send the data in the queue when the socket is writeable
   161 sub transmit_afterwards {
   162     my ($asso, $content) = @_;
   164     # Puts the message in the queue
   165     _lineup($asso, $content);
   166     # Get the current socket
   167     my $link = $asso->{sock};
   168     return unless defined($link);
   169     # Sets the callback to send the data when the socket is writeable
   170     define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
   171 }
   173 # _lineup
   174 #
   175 # Inputs
   176 # $asso (Connection object)
   177 # $content (Message to send)
   178 #
   179 # Outputs
   180 #
   181 # Description
   182 # This is a private function to place the message on the queue for this socket
   183 sub _lineup {
   184     my ($asso, $content) = @_;
   185     # prepend length (encoded as network long)
   186     my $dist = length($content);
   187     # Stores the length as a network long in the first 4 bytes of the message
   188     $content = pack ('N', $dist) . $content; 
   189     push (@{$asso->{queue}}, $content);
   190 }
   192 # _transmit
   193 #
   194 # Inputs
   195 # $asso (Connection object)
   196 # $remove (Deferred Mode)
   197 #
   198 # Outputs
   199 #
   200 # Description
   201 # This is a private function sends the data
   202 sub _transmit {
   203     my ($asso, $remove) = @_;
   204     my $link = $asso->{sock};
   205     return unless defined($link);
   206     my ($Lrq) = $asso->{queue};
   208     # If $remove is set, set the socket to blocking, and send all
   209     # messages in the queue - return only if there's an error
   210     # If $remove is 0 (deferred mode) make the socket non-blocking, and
   211     # return to the event loop only after every message, or if it
   212     # is likely to block in the middle of a message.
   214     $remove ? $asso->define_obstructing() : $asso->define_not_obstructing();
   215     my $branch = (exists $asso->{send_offset}) ? $asso->{send_offset} : 0;
   217     # Loop through the messages in the queue
   218     while (@$Lrq) {
   219         my $content            = $Lrq->[0];
   220         my $sequencetoPublish = length($content) - $branch;
   221         my $sequence_published  = 0;
   222         while ($sequencetoPublish) {
   223             $sequence_published = syswrite ($link, $content,
   224                                        $sequencetoPublish, $branch);
   225             if (!defined($sequence_published)) {
   226                 if (_faultwillObstruct($!)) {
   227                     # Should happen only in deferred mode. Record how
   228                     # much we have already sent.
   229                     $asso->{send_offset} = $branch;
   230                     # Event handler should already be set, so we will
   231                     # be called back eventually, and will resume sending
   232                     return 1;
   233                 } else {    # Uh, oh
   234                     $asso->manage_transmitted_fault($!);
   235                     return 0; # fail. Message remains in queue ..
   236                 }
   237             }
   238             $branch         += $sequence_published;
   239             $sequencetoPublish -= $sequence_published;
   240         }
   241         delete $asso->{send_offset};
   242         $branch = 0;
   243         shift @$Lrq;
   244         last unless $remove; # Go back to select and wait
   245                             # for it to fire again.
   246     }
   247     # Call me back if queue has not been drained.
   248     if (@$Lrq) {
   249         define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
   250     } else {
   251         define_result_manager ($link, "write" => undef);
   252     }
   253     1;  # Success
   254 }
   256 # _faultwillObstruct
   257 #
   258 # Inputs
   259 # $asso (Connection object)
   260 #
   261 # Outputs
   262 #
   263 # Description
   264 # This is a private function processes the blocking error message
   265 sub _faultwillObstruct {
   266     if ($obstructing_maintained) {
   267         return ($_[0] == EAGAIN());
   268     }
   269     return 0;
   270 }
   272 # define_not_obstructing
   273 #
   274 # Inputs
   275 # $_[0] (Connection socket)
   276 #
   277 # Outputs
   278 #
   279 # Description
   280 # This is a function set non-blocking on a socket
   281 sub define_not_obstructing {                        # $asso->define_obstructing
   282     if ($obstructing_maintained) {
   283         # preserve other fcntl flags
   284         my $pins = fcntl ($_[0], F_GETFL(), 0);
   285         fcntl ($_[0], F_SETFL(), $pins | O_NONBLOCK());
   286     }
   287 }
   289 # define_obstructing
   290 #
   291 # Inputs
   292 # $_[0] (Connection socket)
   293 #
   294 # Outputs
   295 #
   296 # Description
   297 # This is a function set blocking on a socket
   298 sub define_obstructing {
   299     if ($obstructing_maintained) {
   300         my $pins = fcntl ($_[0], F_GETFL(), 0);
   301         $pins  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
   302         fcntl ($_[0], F_SETFL(), $pins);
   303     }
   304 }
   306 # manage_transmitted_fault
   307 #
   308 # Inputs
   309 # $asso (Connection object)
   310 # $fault_content (Error message)
   311 #
   312 # Outputs
   313 #
   314 # Description
   315 # This is a function warns on send errors and removes the socket from list of writable sockets
   316 sub manage_transmitted_fault {
   317    # For more meaningful handling of send errors, subclass Msg and
   318    # rebless $asso.  
   319    my ($asso, $fault_content) = @_;
   320    warn "Error while sending: $fault_content \n";
   321    define_result_manager ($asso->{sock}, "write" => undef);
   322 }
   324 #-----------------------------------------------------------------
   325 # Receive side routines
   327 # recent_agent
   328 #
   329 # Inputs
   330 # $collection (Package)
   331 # $mi_receiver (Hostname of the interface to use)
   332 # $mi_change (Port number to listen on)
   333 # $enter_process (Reference to function to call when accepting a connection)
   334 #
   335 # Outputs
   336 #
   337 # Description
   338 # This is a function create a listening socket
   339 my ($g_enter_process,$g_collection);
   340 my $primary_plug = 0;
   341 sub recent_agent {
   342     @_ >= 4 || die "Msg->recent_agent (myhost, myport, login_proc)\n";
   343     my ($RepeatNumber);
   344     my ($collection, $changes, $mi_receiver, $enter_process, $iAssociationBreak, $PlugAssociations) = @_;
   345     # Set a default Socket timeout value
   346     $iAssociationBreak = 0 if (!defined $iAssociationBreak);
   347     # Set a default Socket retry to be forever
   348     $PlugAssociations = -1 if (!defined $PlugAssociations);
   350     while(!$primary_plug)
   351     {
   352         #Check to see if there is a retry limit and if the limit has been reached
   353         if ($PlugAssociations != -1)
   354         {
   355             if (($RepeatNumber / scalar(@$changes)) >= $PlugAssociations)
   356             {
   357                 die "ERROR: could not create socket after ".$RepeatNumber / scalar(@$changes)." attempts";            
   358             } else {
   359                 # Increment the number of retries
   360                 $RepeatNumber++;
   361             }
   362         }
   364         #Try the first port on the list
   365         my $mi_change = shift(@$changes);
   366         #Place the port on the back of the queue
   367         push @$changes,$mi_change;
   369         print "Using port number $mi_change\n";
   370         $primary_plug = IO::Socket::INET->new (
   371                                               LocalAddr => $mi_receiver,
   372                                               LocalPort => $mi_change,
   373                                               Listen    => 5,
   374                                               Proto     => 'tcp',
   375                                               TimeOut =>    10,
   376                                               Reuse     => 1);
   377         sleep $iAssociationBreak if (!$primary_plug);
   378     }
   380     # Set KeepAlive
   381     setsockopt($primary_plug, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
   382     setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
   383     setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
   384     setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
   386     # Add the socket to the list on filehandles to read from.
   387     define_result_manager ($primary_plug, "read" => \&_recent_node);
   388     # Store the package name and login proc for later use
   389     $g_enter_process = $enter_process; $g_collection = $collection;
   390 }
   392 sub get_immediately {
   393     my ($asso) = @_;
   394     my ($content, $fault) = _get ($asso, 1); # 1 ==> rcv now
   395     return wantarray ? ($content, $fault) : $content;
   396 }
   398 sub _get {                     # Complement to _transmit
   399     my ($asso, $get_immediately) = @_; # $get_immediately complement of $remove
   400     # Find out how much has already been received, if at all
   401     my ($content, $branch, $sequencetoScan, $sequence_scan);
   402     my $link = $asso->{sock};
   403     return unless defined($link);
   404     if (exists $asso->{msg}) {
   405         $content           = $asso->{msg};
   406         $branch        = length($content) - 1;  # sysread appends to it.
   407         $sequencetoScan = $asso->{bytes_to_read};
   408         delete $asso->{'msg'};              # have made a copy
   409     } else {
   410         # The typical case ...
   411         $content           = "";                # Otherwise -w complains 
   412         $branch        = 0 ;  
   413         $sequencetoScan = 0 ;                # Will get set soon
   414     }
   415     # We want to read the message length in blocking mode. Quite
   416     # unlikely that we'll get blocked too long reading 4 bytes
   417     if (!$sequencetoScan)  {                 # Get new length 
   418         my $storage;
   419         $asso->define_obstructing();
   420         $sequence_scan = sysread($link, $storage, 4);
   421         if ($! || ($sequence_scan != 4)) {
   422             goto FINISH;
   423         }
   424         $sequencetoScan = unpack ('N', $storage);
   425     }
   426     $asso->define_not_obstructing() unless $get_immediately;
   427     while ($sequencetoScan) {
   428         $sequence_scan = sysread ($link, $content, $sequencetoScan, $branch);
   429         if (defined ($sequence_scan)) {
   430             if ($sequence_scan == 0) {
   431                 last;
   432             }
   433             $sequencetoScan -= $sequence_scan;
   434             $branch        += $sequence_scan;
   435         } else {
   436             if (_faultwillObstruct($!)) {
   437                 # Should come here only in non-blocking mode
   438                 $asso->{msg}           = $content;
   439                 $asso->{bytes_to_read} = $sequencetoScan;
   440                 return ;   # .. _get will be called later
   441                            # when socket is readable again
   442             } else {
   443                 last;
   444             }
   445         }
   446     }
   448   FINISH:
   449     if (length($content) == 0) {
   450         $asso->unplug();
   451     }
   452     if ($get_immediately) {
   453         return ($content, $!);
   454     } else {
   455         &{$asso->{rcvd_notification_proc}}($asso, $content, $!);
   456     }
   457 }
   459 sub _recent_node {
   460     my $link = $primary_plug->accept();
   461     $AllAssociations++;
   462     my $asso = bless {
   463         'sock' =>  $link,
   464         'state' => 'connected'
   465     }, $g_collection;
   466     my $get_notice_process =
   467         &$g_enter_process ($asso, $link->peerhost(), $link->peerport());
   468     if ($get_notice_process) {
   469         $asso->{rcvd_notification_proc} = $get_notice_process;
   470         my $retrieve = sub {_get($asso,0)};
   471         define_result_manager ($link, "read" => $retrieve);
   472     } else {  # Login failed
   473         $asso->unplug();
   474     }
   475 }
   477 #----------------------------------------------------
   478 # Event loop routines used by both client and server
   480 sub define_result_manager {
   481     shift unless ref($_[0]); # shift if first arg is package name
   482     my ($manage, %parameters) = @_;
   483     my $retrieve;
   484     if (exists $parameters{'write'}) {
   485         $retrieve = $parameters{'write'};
   486         if ($retrieve) {
   487             $publish_retrieves{$manage} = $retrieve;
   488             $publish_manages->add($manage);
   489         } else {
   490             delete $publish_retrieves{$manage};
   491             $publish_manages->remove($manage);
   492         }
   493     }
   494     if (exists $parameters{'read'}) {
   495         $retrieve = $parameters{'read'};
   496         if ($retrieve) {
   497             $scan_retrieves{$manage} = $retrieve;
   498             $scan_manages->add($manage);
   499         } else {
   500             delete $scan_retrieves{$manage};
   501             $scan_manages->remove($manage);
   502        }
   503     }
   504 }
   506 sub result_iteration {
   507     my ($collection, $starting_scan_break, $iteration_number) = @_; # result_iteration(1) to process events once
   508     my ($asso, $scan, $publish, $scandefine, $publishdefine);
   509     while (1) {
   510         # Quit the loop if no handles left to process
   511         last unless ($scan_manages->count() || $publish_manages->count());
   512         if (defined $starting_scan_break)
   513         {
   514             ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, $starting_scan_break);
   515             # On initial timeout a read expect a read within timeout if not disconnect
   516             if (!defined $scandefine)
   517             {
   518               print "WARNING: no response from server within $starting_scan_break seconds\n";
   519               last;
   520             }
   521             # Unset intial timeout
   522             $starting_scan_break = undef;
   523         } else {
   524             ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, undef);
   525         }
   526         foreach $scan (@$scandefine) {
   527             &{$scan_retrieves{$scan}} ($scan) if exists $scan_retrieves{$scan};
   528         }
   529         foreach $publish (@$publishdefine) {
   530             &{$publish_retrieves{$publish}}($publish) if exists $publish_retrieves{$publish};
   531         }
   532         if (defined($iteration_number)) {
   533             last unless --$iteration_number;
   534         }
   535     }
   536 }
   538 1;
   540 __END__