deprecated/buildtools/buildsystemtools/Msg.pm
author Ross Qin <ross.qin@nokia.com>
Tue, 02 Nov 2010 09:31:04 +0800
changeset 671 ff8ff850b0cf
parent 655 3f65fd25dfd4
permissions -rw-r--r--
fix the Serious problem with ROFS on-disk format

# Copyright (c) 2003-2009 Nokia Corporation and/or its subsidiary(-ies).
# All rights reserved.
# This component and the accompanying materials are made available
# under the terms of "Eclipse Public License v1.0"
# which accompanies this distribution, and is available
# at the URL "http://www.eclipse.org/legal/epl-v10.html".
#
# Initial Contributors:
# Nokia Corporation - initial contribution.
#
# Contributors:
#
# Description:
#

package Msg;
use strict;
use IO::Select;
use IO::Socket;
use Carp;

use vars qw ( %scan_retrieves %publish_retrieves $scan_manages $publish_manages);

 %scan_retrieves = ();
%publish_retrieves = ();
$scan_manages   = IO::Select->new();
$publish_manages   = IO::Select->new();
my $obstructing_maintained = 0;

my $AllAssociations = 0;


BEGIN {
    # Checks if blocking is supported
    eval {
        require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
    };
    $obstructing_maintained = 1 unless $@;
}

use Socket qw(SO_KEEPALIVE SOL_SOCKET);
use constant TCP_KEEPIDLE  => 4; # Start keeplives after this period
use constant TCP_KEEPINTVL => 5; # Interval between keepalives
use constant TCP_KEEPCNT   => 6; # Number of keepalives before death

# AllAssociations
#
# Inputs
#
# Outputs
#
# Description
# This function returns the total number of connections
sub AllAssociations
{
  return $AllAssociations;
}

# associate
#
# Inputs
# $collection
# $toReceiver (Host associate to)
# $toChange (Port number to associate to)
# $get_notice_process (Function to call on recieving data)
#
# Outputs
#
# Description
# This function connects the client to the server
sub associate {
    my ($collection, $toChange, $toReceiver, $get_notice_process) = @_;
    
    # Create a new internet socket
    
    my $link = IO::Socket::INET->new (
                                      PeerAddr => $toReceiver,
                                      PeerPort => $toChange,
                                      Proto    => 'tcp',
                                      TimeOut => 10,
                                      Reuse    => 1);

    return undef unless $link;

    # Set KeepAlive
    setsockopt($link, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
    setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
    setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
    setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
  
    # Increse the total connection count
    $AllAssociations++;

    # Create a connection end-point object
    my $asso = bless {
        sock                   => $link,
        rcvd_notification_proc => $get_notice_process,
    }, $collection;
    
      # Set up the callback to the rcv function
    if ($get_notice_process) {
        my $retrieve = sub {_get($asso, 0)};
        define_result_manager ($link, "read" => $retrieve);
    }
    $asso;
}

# unplug
#
# Inputs
# $asso (Connection object)
#
# Outputs
#
# Description
# This function disconnects a connection and cleans up
sub unplug {
    my $asso = shift;
    
    # Decrease the number of total connections
    $AllAssociations--;
    
    # Delete the socket
    my $link = delete $asso->{sock};
    return unless defined($link);
    # Set to not try and check for reads and writes of this socket
    define_result_manager ($link, "write" => undef, "read" => undef);
    close($link);
}

# transmit_immediately
#
# Inputs
# $asso (Connection object)
# $content (Message to send)
#
# Outputs
#
# Description
# This function does a immediate send, this will block if the socket is not writeable
sub transmit_immediately {
    my ($asso, $content) = @_;
    
    # Puts the message in the queue
    _lineup ($asso, $content);
    # Flushes the queue
    $asso->_transmit (1); # 1 ==> flush
}

# transmit_afterwards
#
# Inputs
# $asso (Connection object)
# $content (Message to send)
#
# Outputs
#
# Description
# This function does a sends at a later time, does not block if the socket is not writeable.
# It sets a callback to send the data in the queue when the socket is writeable
sub transmit_afterwards {
    my ($asso, $content) = @_;
    
    # Puts the message in the queue
    _lineup($asso, $content);
    # Get the current socket
    my $link = $asso->{sock};
    return unless defined($link);
    # Sets the callback to send the data when the socket is writeable
    define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
}

# _lineup
#
# Inputs
# $asso (Connection object)
# $content (Message to send)
#
# Outputs
#
# Description
# This is a private function to place the message on the queue for this socket
sub _lineup {
    my ($asso, $content) = @_;
    # prepend length (encoded as network long)
    my $dist = length($content);
    # Stores the length as a network long in the first 4 bytes of the message
    $content = pack ('N', $dist) . $content; 
    push (@{$asso->{queue}}, $content);
}

# _transmit
#
# Inputs
# $asso (Connection object)
# $remove (Deferred Mode)
#
# Outputs
#
# Description
# This is a private function sends the data
sub _transmit {
    my ($asso, $remove) = @_;
    my $link = $asso->{sock};
    return unless defined($link);
    my ($Lrq) = $asso->{queue};

    # If $remove is set, set the socket to blocking, and send all
    # messages in the queue - return only if there's an error
    # If $remove is 0 (deferred mode) make the socket non-blocking, and
    # return to the event loop only after every message, or if it
    # is likely to block in the middle of a message.

    $remove ? $asso->define_obstructing() : $asso->define_not_obstructing();
    my $branch = (exists $asso->{send_offset}) ? $asso->{send_offset} : 0;

    # Loop through the messages in the queue
    while (@$Lrq) {
        my $content            = $Lrq->[0];
        my $sequencetoPublish = length($content) - $branch;
        my $sequence_published  = 0;
        while ($sequencetoPublish) {
            $sequence_published = syswrite ($link, $content,
                                       $sequencetoPublish, $branch);
            if (!defined($sequence_published)) {
                if (_faultwillObstruct($!)) {
                    # Should happen only in deferred mode. Record how
                    # much we have already sent.
                    $asso->{send_offset} = $branch;
                    # Event handler should already be set, so we will
                    # be called back eventually, and will resume sending
                    return 1;
                } else {    # Uh, oh
                    $asso->manage_transmitted_fault($!);
                    return 0; # fail. Message remains in queue ..
                }
            }
            $branch         += $sequence_published;
            $sequencetoPublish -= $sequence_published;
        }
        delete $asso->{send_offset};
        $branch = 0;
        shift @$Lrq;
        last unless $remove; # Go back to select and wait
                            # for it to fire again.
    }
    # Call me back if queue has not been drained.
    if (@$Lrq) {
        define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
    } else {
        define_result_manager ($link, "write" => undef);
    }
    1;  # Success
}

# _faultwillObstruct
#
# Inputs
# $asso (Connection object)
#
# Outputs
#
# Description
# This is a private function processes the blocking error message
sub _faultwillObstruct {
    if ($obstructing_maintained) {
        return ($_[0] == EAGAIN());
    }
    return 0;
}

# define_not_obstructing
#
# Inputs
# $_[0] (Connection socket)
#
# Outputs
#
# Description
# This is a function set non-blocking on a socket
sub define_not_obstructing {                        # $asso->define_obstructing
    if ($obstructing_maintained) {
        # preserve other fcntl flags
        my $pins = fcntl ($_[0], F_GETFL(), 0);
        fcntl ($_[0], F_SETFL(), $pins | O_NONBLOCK());
    }
}

# define_obstructing
#
# Inputs
# $_[0] (Connection socket)
#
# Outputs
#
# Description
# This is a function set blocking on a socket
sub define_obstructing {
    if ($obstructing_maintained) {
        my $pins = fcntl ($_[0], F_GETFL(), 0);
        $pins  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
        fcntl ($_[0], F_SETFL(), $pins);
    }
}

# manage_transmitted_fault
#
# Inputs
# $asso (Connection object)
# $fault_content (Error message)
#
# Outputs
#
# Description
# This is a function warns on send errors and removes the socket from list of writable sockets
sub manage_transmitted_fault {
   # For more meaningful handling of send errors, subclass Msg and
   # rebless $asso.  
   my ($asso, $fault_content) = @_;
   warn "Error while sending: $fault_content \n";
   define_result_manager ($asso->{sock}, "write" => undef);
}

#-----------------------------------------------------------------
# Receive side routines

# recent_agent
#
# Inputs
# $collection (Package)
# $mi_receiver (Hostname of the interface to use)
# $mi_change (Port number to listen on)
# $enter_process (Reference to function to call when accepting a connection)
#
# Outputs
#
# Description
# This is a function create a listening socket
my ($g_enter_process,$g_collection);
my $primary_plug = 0;
sub recent_agent {
    @_ >= 4 || die "Msg->recent_agent (myhost, myport, login_proc)\n";
    my ($RepeatNumber);
    my ($collection, $changes, $mi_receiver, $enter_process, $iAssociationBreak, $PlugAssociations) = @_;
    # Set a default Socket timeout value
    $iAssociationBreak = 0 if (!defined $iAssociationBreak);
    # Set a default Socket retry to be forever
    $PlugAssociations = -1 if (!defined $PlugAssociations);
    
    while(!$primary_plug)
    {
        #Check to see if there is a retry limit and if the limit has been reached
        if ($PlugAssociations != -1)
        {
            if (($RepeatNumber / scalar(@$changes)) >= $PlugAssociations)
            {
                die "ERROR: could not create socket after ".$RepeatNumber / scalar(@$changes)." attempts";            
            } else {
                # Increment the number of retries
                $RepeatNumber++;
            }
        }
        
        #Try the first port on the list
        my $mi_change = shift(@$changes);
        #Place the port on the back of the queue
        push @$changes,$mi_change;
        
        print "Using port number $mi_change\n";
        $primary_plug = IO::Socket::INET->new (
                                              LocalAddr => $mi_receiver,
                                              LocalPort => $mi_change,
                                              Listen    => 5,
                                              Proto     => 'tcp',
                                              TimeOut =>    10,
                                              Reuse     => 1);
        sleep $iAssociationBreak if (!$primary_plug);
    }
    
    # Set KeepAlive
    setsockopt($primary_plug, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
    setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
    setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
    setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
    
    # Add the socket to the list on filehandles to read from.
    define_result_manager ($primary_plug, "read" => \&_recent_node);
    # Store the package name and login proc for later use
    $g_enter_process = $enter_process; $g_collection = $collection;
}

sub get_immediately {
    my ($asso) = @_;
    my ($content, $fault) = _get ($asso, 1); # 1 ==> rcv now
    return wantarray ? ($content, $fault) : $content;
}

sub _get {                     # Complement to _transmit
    my ($asso, $get_immediately) = @_; # $get_immediately complement of $remove
    # Find out how much has already been received, if at all
    my ($content, $branch, $sequencetoScan, $sequence_scan);
    my $link = $asso->{sock};
    return unless defined($link);
    if (exists $asso->{msg}) {
        $content           = $asso->{msg};
        $branch        = length($content) - 1;  # sysread appends to it.
        $sequencetoScan = $asso->{bytes_to_read};
        delete $asso->{'msg'};              # have made a copy
    } else {
        # The typical case ...
        $content           = "";                # Otherwise -w complains 
        $branch        = 0 ;  
        $sequencetoScan = 0 ;                # Will get set soon
    }
    # We want to read the message length in blocking mode. Quite
    # unlikely that we'll get blocked too long reading 4 bytes
    if (!$sequencetoScan)  {                 # Get new length 
        my $storage;
        $asso->define_obstructing();
        $sequence_scan = sysread($link, $storage, 4);
        if ($! || ($sequence_scan != 4)) {
            goto FINISH;
        }
        $sequencetoScan = unpack ('N', $storage);
    }
    $asso->define_not_obstructing() unless $get_immediately;
    while ($sequencetoScan) {
        $sequence_scan = sysread ($link, $content, $sequencetoScan, $branch);
        if (defined ($sequence_scan)) {
            if ($sequence_scan == 0) {
                last;
            }
            $sequencetoScan -= $sequence_scan;
            $branch        += $sequence_scan;
        } else {
            if (_faultwillObstruct($!)) {
                # Should come here only in non-blocking mode
                $asso->{msg}           = $content;
                $asso->{bytes_to_read} = $sequencetoScan;
                return ;   # .. _get will be called later
                           # when socket is readable again
            } else {
                last;
            }
        }
    }

  FINISH:
    if (length($content) == 0) {
        $asso->unplug();
    }
    if ($get_immediately) {
        return ($content, $!);
    } else {
        &{$asso->{rcvd_notification_proc}}($asso, $content, $!);
    }
}

sub _recent_node {
    my $link = $primary_plug->accept();
    $AllAssociations++;
    my $asso = bless {
        'sock' =>  $link,
        'state' => 'connected'
    }, $g_collection;
    my $get_notice_process =
        &$g_enter_process ($asso, $link->peerhost(), $link->peerport());
    if ($get_notice_process) {
        $asso->{rcvd_notification_proc} = $get_notice_process;
        my $retrieve = sub {_get($asso,0)};
        define_result_manager ($link, "read" => $retrieve);
    } else {  # Login failed
        $asso->unplug();
    }
}

#----------------------------------------------------
# Event loop routines used by both client and server

sub define_result_manager {
    shift unless ref($_[0]); # shift if first arg is package name
    my ($manage, %parameters) = @_;
    my $retrieve;
    if (exists $parameters{'write'}) {
        $retrieve = $parameters{'write'};
        if ($retrieve) {
            $publish_retrieves{$manage} = $retrieve;
            $publish_manages->add($manage);
        } else {
            delete $publish_retrieves{$manage};
            $publish_manages->remove($manage);
        }
    }
    if (exists $parameters{'read'}) {
        $retrieve = $parameters{'read'};
        if ($retrieve) {
            $scan_retrieves{$manage} = $retrieve;
            $scan_manages->add($manage);
        } else {
            delete $scan_retrieves{$manage};
            $scan_manages->remove($manage);
       }
    }
}

sub result_iteration {
    my ($collection, $starting_scan_break, $iteration_number) = @_; # result_iteration(1) to process events once
    my ($asso, $scan, $publish, $scandefine, $publishdefine);
    while (1) {
        # Quit the loop if no handles left to process
        last unless ($scan_manages->count() || $publish_manages->count());
        if (defined $starting_scan_break)
        {
            ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, $starting_scan_break);
            # On initial timeout a read expect a read within timeout if not disconnect
            if (!defined $scandefine)
            {
              print "WARNING: no response from server within $starting_scan_break seconds\n";
              last;
            }
            # Unset intial timeout
            $starting_scan_break = undef;
        } else {
            ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, undef);
        }
        foreach $scan (@$scandefine) {
            &{$scan_retrieves{$scan}} ($scan) if exists $scan_retrieves{$scan};
        }
        foreach $publish (@$publishdefine) {
            &{$publish_retrieves{$publish}}($publish) if exists $publish_retrieves{$publish};
        }
        if (defined($iteration_number)) {
            last unless --$iteration_number;
        }
    }
}

1;

__END__