deprecated/buildtools/buildsystemtools/Msg.pm
changeset 655 3f65fd25dfd4
equal deleted inserted replaced
649:02d78c9f018e 655:3f65fd25dfd4
       
     1 # Copyright (c) 2003-2009 Nokia Corporation and/or its subsidiary(-ies).
       
     2 # All rights reserved.
       
     3 # This component and the accompanying materials are made available
       
     4 # under the terms of "Eclipse Public License v1.0"
       
     5 # which accompanies this distribution, and is available
       
     6 # at the URL "http://www.eclipse.org/legal/epl-v10.html".
       
     7 #
       
     8 # Initial Contributors:
       
     9 # Nokia Corporation - initial contribution.
       
    10 #
       
    11 # Contributors:
       
    12 #
       
    13 # Description:
       
    14 #
       
    15 
       
    16 package Msg;
       
    17 use strict;
       
    18 use IO::Select;
       
    19 use IO::Socket;
       
    20 use Carp;
       
    21 
       
    22 use vars qw ( %scan_retrieves %publish_retrieves $scan_manages $publish_manages);
       
    23 
       
    24  %scan_retrieves = ();
       
    25 %publish_retrieves = ();
       
    26 $scan_manages   = IO::Select->new();
       
    27 $publish_manages   = IO::Select->new();
       
    28 my $obstructing_maintained = 0;
       
    29 
       
    30 my $AllAssociations = 0;
       
    31 
       
    32 
       
    33 BEGIN {
       
    34     # Checks if blocking is supported
       
    35     eval {
       
    36         require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
       
    37     };
       
    38     $obstructing_maintained = 1 unless $@;
       
    39 }
       
    40 
       
    41 use Socket qw(SO_KEEPALIVE SOL_SOCKET);
       
    42 use constant TCP_KEEPIDLE  => 4; # Start keeplives after this period
       
    43 use constant TCP_KEEPINTVL => 5; # Interval between keepalives
       
    44 use constant TCP_KEEPCNT   => 6; # Number of keepalives before death
       
    45 
       
    46 # AllAssociations
       
    47 #
       
    48 # Inputs
       
    49 #
       
    50 # Outputs
       
    51 #
       
    52 # Description
       
    53 # This function returns the total number of connections
       
    54 sub AllAssociations
       
    55 {
       
    56   return $AllAssociations;
       
    57 }
       
    58 
       
    59 # associate
       
    60 #
       
    61 # Inputs
       
    62 # $collection
       
    63 # $toReceiver (Host associate to)
       
    64 # $toChange (Port number to associate to)
       
    65 # $get_notice_process (Function to call on recieving data)
       
    66 #
       
    67 # Outputs
       
    68 #
       
    69 # Description
       
    70 # This function connects the client to the server
       
    71 sub associate {
       
    72     my ($collection, $toChange, $toReceiver, $get_notice_process) = @_;
       
    73     
       
    74     # Create a new internet socket
       
    75     
       
    76     my $link = IO::Socket::INET->new (
       
    77                                       PeerAddr => $toReceiver,
       
    78                                       PeerPort => $toChange,
       
    79                                       Proto    => 'tcp',
       
    80                                       TimeOut => 10,
       
    81                                       Reuse    => 1);
       
    82 
       
    83     return undef unless $link;
       
    84 
       
    85     # Set KeepAlive
       
    86     setsockopt($link, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
       
    87     setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
       
    88     setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
       
    89     setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
       
    90   
       
    91     # Increse the total connection count
       
    92     $AllAssociations++;
       
    93 
       
    94     # Create a connection end-point object
       
    95     my $asso = bless {
       
    96         sock                   => $link,
       
    97         rcvd_notification_proc => $get_notice_process,
       
    98     }, $collection;
       
    99     
       
   100       # Set up the callback to the rcv function
       
   101     if ($get_notice_process) {
       
   102         my $retrieve = sub {_get($asso, 0)};
       
   103         define_result_manager ($link, "read" => $retrieve);
       
   104     }
       
   105     $asso;
       
   106 }
       
   107 
       
   108 # unplug
       
   109 #
       
   110 # Inputs
       
   111 # $asso (Connection object)
       
   112 #
       
   113 # Outputs
       
   114 #
       
   115 # Description
       
   116 # This function disconnects a connection and cleans up
       
   117 sub unplug {
       
   118     my $asso = shift;
       
   119     
       
   120     # Decrease the number of total connections
       
   121     $AllAssociations--;
       
   122     
       
   123     # Delete the socket
       
   124     my $link = delete $asso->{sock};
       
   125     return unless defined($link);
       
   126     # Set to not try and check for reads and writes of this socket
       
   127     define_result_manager ($link, "write" => undef, "read" => undef);
       
   128     close($link);
       
   129 }
       
   130 
       
   131 # transmit_immediately
       
   132 #
       
   133 # Inputs
       
   134 # $asso (Connection object)
       
   135 # $content (Message to send)
       
   136 #
       
   137 # Outputs
       
   138 #
       
   139 # Description
       
   140 # This function does a immediate send, this will block if the socket is not writeable
       
   141 sub transmit_immediately {
       
   142     my ($asso, $content) = @_;
       
   143     
       
   144     # Puts the message in the queue
       
   145     _lineup ($asso, $content);
       
   146     # Flushes the queue
       
   147     $asso->_transmit (1); # 1 ==> flush
       
   148 }
       
   149 
       
   150 # transmit_afterwards
       
   151 #
       
   152 # Inputs
       
   153 # $asso (Connection object)
       
   154 # $content (Message to send)
       
   155 #
       
   156 # Outputs
       
   157 #
       
   158 # Description
       
   159 # This function does a sends at a later time, does not block if the socket is not writeable.
       
   160 # It sets a callback to send the data in the queue when the socket is writeable
       
   161 sub transmit_afterwards {
       
   162     my ($asso, $content) = @_;
       
   163     
       
   164     # Puts the message in the queue
       
   165     _lineup($asso, $content);
       
   166     # Get the current socket
       
   167     my $link = $asso->{sock};
       
   168     return unless defined($link);
       
   169     # Sets the callback to send the data when the socket is writeable
       
   170     define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
       
   171 }
       
   172 
       
   173 # _lineup
       
   174 #
       
   175 # Inputs
       
   176 # $asso (Connection object)
       
   177 # $content (Message to send)
       
   178 #
       
   179 # Outputs
       
   180 #
       
   181 # Description
       
   182 # This is a private function to place the message on the queue for this socket
       
   183 sub _lineup {
       
   184     my ($asso, $content) = @_;
       
   185     # prepend length (encoded as network long)
       
   186     my $dist = length($content);
       
   187     # Stores the length as a network long in the first 4 bytes of the message
       
   188     $content = pack ('N', $dist) . $content; 
       
   189     push (@{$asso->{queue}}, $content);
       
   190 }
       
   191 
       
   192 # _transmit
       
   193 #
       
   194 # Inputs
       
   195 # $asso (Connection object)
       
   196 # $remove (Deferred Mode)
       
   197 #
       
   198 # Outputs
       
   199 #
       
   200 # Description
       
   201 # This is a private function sends the data
       
   202 sub _transmit {
       
   203     my ($asso, $remove) = @_;
       
   204     my $link = $asso->{sock};
       
   205     return unless defined($link);
       
   206     my ($Lrq) = $asso->{queue};
       
   207 
       
   208     # If $remove is set, set the socket to blocking, and send all
       
   209     # messages in the queue - return only if there's an error
       
   210     # If $remove is 0 (deferred mode) make the socket non-blocking, and
       
   211     # return to the event loop only after every message, or if it
       
   212     # is likely to block in the middle of a message.
       
   213 
       
   214     $remove ? $asso->define_obstructing() : $asso->define_not_obstructing();
       
   215     my $branch = (exists $asso->{send_offset}) ? $asso->{send_offset} : 0;
       
   216 
       
   217     # Loop through the messages in the queue
       
   218     while (@$Lrq) {
       
   219         my $content            = $Lrq->[0];
       
   220         my $sequencetoPublish = length($content) - $branch;
       
   221         my $sequence_published  = 0;
       
   222         while ($sequencetoPublish) {
       
   223             $sequence_published = syswrite ($link, $content,
       
   224                                        $sequencetoPublish, $branch);
       
   225             if (!defined($sequence_published)) {
       
   226                 if (_faultwillObstruct($!)) {
       
   227                     # Should happen only in deferred mode. Record how
       
   228                     # much we have already sent.
       
   229                     $asso->{send_offset} = $branch;
       
   230                     # Event handler should already be set, so we will
       
   231                     # be called back eventually, and will resume sending
       
   232                     return 1;
       
   233                 } else {    # Uh, oh
       
   234                     $asso->manage_transmitted_fault($!);
       
   235                     return 0; # fail. Message remains in queue ..
       
   236                 }
       
   237             }
       
   238             $branch         += $sequence_published;
       
   239             $sequencetoPublish -= $sequence_published;
       
   240         }
       
   241         delete $asso->{send_offset};
       
   242         $branch = 0;
       
   243         shift @$Lrq;
       
   244         last unless $remove; # Go back to select and wait
       
   245                             # for it to fire again.
       
   246     }
       
   247     # Call me back if queue has not been drained.
       
   248     if (@$Lrq) {
       
   249         define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
       
   250     } else {
       
   251         define_result_manager ($link, "write" => undef);
       
   252     }
       
   253     1;  # Success
       
   254 }
       
   255 
       
   256 # _faultwillObstruct
       
   257 #
       
   258 # Inputs
       
   259 # $asso (Connection object)
       
   260 #
       
   261 # Outputs
       
   262 #
       
   263 # Description
       
   264 # This is a private function processes the blocking error message
       
   265 sub _faultwillObstruct {
       
   266     if ($obstructing_maintained) {
       
   267         return ($_[0] == EAGAIN());
       
   268     }
       
   269     return 0;
       
   270 }
       
   271 
       
   272 # define_not_obstructing
       
   273 #
       
   274 # Inputs
       
   275 # $_[0] (Connection socket)
       
   276 #
       
   277 # Outputs
       
   278 #
       
   279 # Description
       
   280 # This is a function set non-blocking on a socket
       
   281 sub define_not_obstructing {                        # $asso->define_obstructing
       
   282     if ($obstructing_maintained) {
       
   283         # preserve other fcntl flags
       
   284         my $pins = fcntl ($_[0], F_GETFL(), 0);
       
   285         fcntl ($_[0], F_SETFL(), $pins | O_NONBLOCK());
       
   286     }
       
   287 }
       
   288 
       
   289 # define_obstructing
       
   290 #
       
   291 # Inputs
       
   292 # $_[0] (Connection socket)
       
   293 #
       
   294 # Outputs
       
   295 #
       
   296 # Description
       
   297 # This is a function set blocking on a socket
       
   298 sub define_obstructing {
       
   299     if ($obstructing_maintained) {
       
   300         my $pins = fcntl ($_[0], F_GETFL(), 0);
       
   301         $pins  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
       
   302         fcntl ($_[0], F_SETFL(), $pins);
       
   303     }
       
   304 }
       
   305 
       
   306 # manage_transmitted_fault
       
   307 #
       
   308 # Inputs
       
   309 # $asso (Connection object)
       
   310 # $fault_content (Error message)
       
   311 #
       
   312 # Outputs
       
   313 #
       
   314 # Description
       
   315 # This is a function warns on send errors and removes the socket from list of writable sockets
       
   316 sub manage_transmitted_fault {
       
   317    # For more meaningful handling of send errors, subclass Msg and
       
   318    # rebless $asso.  
       
   319    my ($asso, $fault_content) = @_;
       
   320    warn "Error while sending: $fault_content \n";
       
   321    define_result_manager ($asso->{sock}, "write" => undef);
       
   322 }
       
   323 
       
   324 #-----------------------------------------------------------------
       
   325 # Receive side routines
       
   326 
       
   327 # recent_agent
       
   328 #
       
   329 # Inputs
       
   330 # $collection (Package)
       
   331 # $mi_receiver (Hostname of the interface to use)
       
   332 # $mi_change (Port number to listen on)
       
   333 # $enter_process (Reference to function to call when accepting a connection)
       
   334 #
       
   335 # Outputs
       
   336 #
       
   337 # Description
       
   338 # This is a function create a listening socket
       
   339 my ($g_enter_process,$g_collection);
       
   340 my $primary_plug = 0;
       
   341 sub recent_agent {
       
   342     @_ >= 4 || die "Msg->recent_agent (myhost, myport, login_proc)\n";
       
   343     my ($RepeatNumber);
       
   344     my ($collection, $changes, $mi_receiver, $enter_process, $iAssociationBreak, $PlugAssociations) = @_;
       
   345     # Set a default Socket timeout value
       
   346     $iAssociationBreak = 0 if (!defined $iAssociationBreak);
       
   347     # Set a default Socket retry to be forever
       
   348     $PlugAssociations = -1 if (!defined $PlugAssociations);
       
   349     
       
   350     while(!$primary_plug)
       
   351     {
       
   352         #Check to see if there is a retry limit and if the limit has been reached
       
   353         if ($PlugAssociations != -1)
       
   354         {
       
   355             if (($RepeatNumber / scalar(@$changes)) >= $PlugAssociations)
       
   356             {
       
   357                 die "ERROR: could not create socket after ".$RepeatNumber / scalar(@$changes)." attempts";            
       
   358             } else {
       
   359                 # Increment the number of retries
       
   360                 $RepeatNumber++;
       
   361             }
       
   362         }
       
   363         
       
   364         #Try the first port on the list
       
   365         my $mi_change = shift(@$changes);
       
   366         #Place the port on the back of the queue
       
   367         push @$changes,$mi_change;
       
   368         
       
   369         print "Using port number $mi_change\n";
       
   370         $primary_plug = IO::Socket::INET->new (
       
   371                                               LocalAddr => $mi_receiver,
       
   372                                               LocalPort => $mi_change,
       
   373                                               Listen    => 5,
       
   374                                               Proto     => 'tcp',
       
   375                                               TimeOut =>    10,
       
   376                                               Reuse     => 1);
       
   377         sleep $iAssociationBreak if (!$primary_plug);
       
   378     }
       
   379     
       
   380     # Set KeepAlive
       
   381     setsockopt($primary_plug, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
       
   382     setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
       
   383     setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
       
   384     setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
       
   385     
       
   386     # Add the socket to the list on filehandles to read from.
       
   387     define_result_manager ($primary_plug, "read" => \&_recent_node);
       
   388     # Store the package name and login proc for later use
       
   389     $g_enter_process = $enter_process; $g_collection = $collection;
       
   390 }
       
   391 
       
   392 sub get_immediately {
       
   393     my ($asso) = @_;
       
   394     my ($content, $fault) = _get ($asso, 1); # 1 ==> rcv now
       
   395     return wantarray ? ($content, $fault) : $content;
       
   396 }
       
   397 
       
   398 sub _get {                     # Complement to _transmit
       
   399     my ($asso, $get_immediately) = @_; # $get_immediately complement of $remove
       
   400     # Find out how much has already been received, if at all
       
   401     my ($content, $branch, $sequencetoScan, $sequence_scan);
       
   402     my $link = $asso->{sock};
       
   403     return unless defined($link);
       
   404     if (exists $asso->{msg}) {
       
   405         $content           = $asso->{msg};
       
   406         $branch        = length($content) - 1;  # sysread appends to it.
       
   407         $sequencetoScan = $asso->{bytes_to_read};
       
   408         delete $asso->{'msg'};              # have made a copy
       
   409     } else {
       
   410         # The typical case ...
       
   411         $content           = "";                # Otherwise -w complains 
       
   412         $branch        = 0 ;  
       
   413         $sequencetoScan = 0 ;                # Will get set soon
       
   414     }
       
   415     # We want to read the message length in blocking mode. Quite
       
   416     # unlikely that we'll get blocked too long reading 4 bytes
       
   417     if (!$sequencetoScan)  {                 # Get new length 
       
   418         my $storage;
       
   419         $asso->define_obstructing();
       
   420         $sequence_scan = sysread($link, $storage, 4);
       
   421         if ($! || ($sequence_scan != 4)) {
       
   422             goto FINISH;
       
   423         }
       
   424         $sequencetoScan = unpack ('N', $storage);
       
   425     }
       
   426     $asso->define_not_obstructing() unless $get_immediately;
       
   427     while ($sequencetoScan) {
       
   428         $sequence_scan = sysread ($link, $content, $sequencetoScan, $branch);
       
   429         if (defined ($sequence_scan)) {
       
   430             if ($sequence_scan == 0) {
       
   431                 last;
       
   432             }
       
   433             $sequencetoScan -= $sequence_scan;
       
   434             $branch        += $sequence_scan;
       
   435         } else {
       
   436             if (_faultwillObstruct($!)) {
       
   437                 # Should come here only in non-blocking mode
       
   438                 $asso->{msg}           = $content;
       
   439                 $asso->{bytes_to_read} = $sequencetoScan;
       
   440                 return ;   # .. _get will be called later
       
   441                            # when socket is readable again
       
   442             } else {
       
   443                 last;
       
   444             }
       
   445         }
       
   446     }
       
   447 
       
   448   FINISH:
       
   449     if (length($content) == 0) {
       
   450         $asso->unplug();
       
   451     }
       
   452     if ($get_immediately) {
       
   453         return ($content, $!);
       
   454     } else {
       
   455         &{$asso->{rcvd_notification_proc}}($asso, $content, $!);
       
   456     }
       
   457 }
       
   458 
       
   459 sub _recent_node {
       
   460     my $link = $primary_plug->accept();
       
   461     $AllAssociations++;
       
   462     my $asso = bless {
       
   463         'sock' =>  $link,
       
   464         'state' => 'connected'
       
   465     }, $g_collection;
       
   466     my $get_notice_process =
       
   467         &$g_enter_process ($asso, $link->peerhost(), $link->peerport());
       
   468     if ($get_notice_process) {
       
   469         $asso->{rcvd_notification_proc} = $get_notice_process;
       
   470         my $retrieve = sub {_get($asso,0)};
       
   471         define_result_manager ($link, "read" => $retrieve);
       
   472     } else {  # Login failed
       
   473         $asso->unplug();
       
   474     }
       
   475 }
       
   476 
       
   477 #----------------------------------------------------
       
   478 # Event loop routines used by both client and server
       
   479 
       
   480 sub define_result_manager {
       
   481     shift unless ref($_[0]); # shift if first arg is package name
       
   482     my ($manage, %parameters) = @_;
       
   483     my $retrieve;
       
   484     if (exists $parameters{'write'}) {
       
   485         $retrieve = $parameters{'write'};
       
   486         if ($retrieve) {
       
   487             $publish_retrieves{$manage} = $retrieve;
       
   488             $publish_manages->add($manage);
       
   489         } else {
       
   490             delete $publish_retrieves{$manage};
       
   491             $publish_manages->remove($manage);
       
   492         }
       
   493     }
       
   494     if (exists $parameters{'read'}) {
       
   495         $retrieve = $parameters{'read'};
       
   496         if ($retrieve) {
       
   497             $scan_retrieves{$manage} = $retrieve;
       
   498             $scan_manages->add($manage);
       
   499         } else {
       
   500             delete $scan_retrieves{$manage};
       
   501             $scan_manages->remove($manage);
       
   502        }
       
   503     }
       
   504 }
       
   505 
       
   506 sub result_iteration {
       
   507     my ($collection, $starting_scan_break, $iteration_number) = @_; # result_iteration(1) to process events once
       
   508     my ($asso, $scan, $publish, $scandefine, $publishdefine);
       
   509     while (1) {
       
   510         # Quit the loop if no handles left to process
       
   511         last unless ($scan_manages->count() || $publish_manages->count());
       
   512         if (defined $starting_scan_break)
       
   513         {
       
   514             ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, $starting_scan_break);
       
   515             # On initial timeout a read expect a read within timeout if not disconnect
       
   516             if (!defined $scandefine)
       
   517             {
       
   518               print "WARNING: no response from server within $starting_scan_break seconds\n";
       
   519               last;
       
   520             }
       
   521             # Unset intial timeout
       
   522             $starting_scan_break = undef;
       
   523         } else {
       
   524             ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, undef);
       
   525         }
       
   526         foreach $scan (@$scandefine) {
       
   527             &{$scan_retrieves{$scan}} ($scan) if exists $scan_retrieves{$scan};
       
   528         }
       
   529         foreach $publish (@$publishdefine) {
       
   530             &{$publish_retrieves{$publish}}($publish) if exists $publish_retrieves{$publish};
       
   531         }
       
   532         if (defined($iteration_number)) {
       
   533             last unless --$iteration_number;
       
   534         }
       
   535     }
       
   536 }
       
   537 
       
   538 1;
       
   539 
       
   540 __END__
       
   541