deprecated/buildtools/buildsystemtools/Msg.pm
author lorewang
Wed, 17 Nov 2010 11:24:29 +0800
changeset 685 39f7ecf8fbc7
parent 655 3f65fd25dfd4
permissions -rw-r--r--
pull from trunk
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
655
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     1
# Copyright (c) 2003-2009 Nokia Corporation and/or its subsidiary(-ies).
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     2
# All rights reserved.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     3
# This component and the accompanying materials are made available
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     4
# under the terms of "Eclipse Public License v1.0"
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     5
# which accompanies this distribution, and is available
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     6
# at the URL "http://www.eclipse.org/legal/epl-v10.html".
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     7
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     8
# Initial Contributors:
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
     9
# Nokia Corporation - initial contribution.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    10
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    11
# Contributors:
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    12
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    13
# Description:
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    14
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    15
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    16
package Msg;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    17
use strict;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    18
use IO::Select;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    19
use IO::Socket;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    20
use Carp;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    21
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    22
use vars qw ( %scan_retrieves %publish_retrieves $scan_manages $publish_manages);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    23
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    24
 %scan_retrieves = ();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    25
%publish_retrieves = ();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    26
$scan_manages   = IO::Select->new();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    27
$publish_manages   = IO::Select->new();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    28
my $obstructing_maintained = 0;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    29
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    30
my $AllAssociations = 0;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    31
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    32
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    33
BEGIN {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    34
    # Checks if blocking is supported
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    35
    eval {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    36
        require POSIX; POSIX->import(qw (F_SETFL O_NONBLOCK EAGAIN));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    37
    };
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    38
    $obstructing_maintained = 1 unless $@;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    39
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    40
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    41
use Socket qw(SO_KEEPALIVE SOL_SOCKET);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    42
use constant TCP_KEEPIDLE  => 4; # Start keeplives after this period
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    43
use constant TCP_KEEPINTVL => 5; # Interval between keepalives
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    44
use constant TCP_KEEPCNT   => 6; # Number of keepalives before death
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    45
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    46
# AllAssociations
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    47
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    48
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    49
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    50
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    51
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    52
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    53
# This function returns the total number of connections
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    54
sub AllAssociations
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    55
{
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    56
  return $AllAssociations;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    57
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    58
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    59
# associate
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    60
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    61
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    62
# $collection
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    63
# $toReceiver (Host associate to)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    64
# $toChange (Port number to associate to)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    65
# $get_notice_process (Function to call on recieving data)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    66
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    67
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    68
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    69
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    70
# This function connects the client to the server
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    71
sub associate {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    72
    my ($collection, $toChange, $toReceiver, $get_notice_process) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    73
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    74
    # Create a new internet socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    75
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    76
    my $link = IO::Socket::INET->new (
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    77
                                      PeerAddr => $toReceiver,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    78
                                      PeerPort => $toChange,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    79
                                      Proto    => 'tcp',
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    80
                                      TimeOut => 10,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    81
                                      Reuse    => 1);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    82
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    83
    return undef unless $link;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    84
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    85
    # Set KeepAlive
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    86
    setsockopt($link, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    87
    setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    88
    setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    89
    setsockopt($link, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    90
  
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    91
    # Increse the total connection count
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    92
    $AllAssociations++;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    93
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    94
    # Create a connection end-point object
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    95
    my $asso = bless {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    96
        sock                   => $link,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    97
        rcvd_notification_proc => $get_notice_process,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    98
    }, $collection;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
    99
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   100
      # Set up the callback to the rcv function
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   101
    if ($get_notice_process) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   102
        my $retrieve = sub {_get($asso, 0)};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   103
        define_result_manager ($link, "read" => $retrieve);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   104
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   105
    $asso;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   106
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   107
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   108
# unplug
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   109
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   110
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   111
# $asso (Connection object)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   112
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   113
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   114
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   115
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   116
# This function disconnects a connection and cleans up
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   117
sub unplug {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   118
    my $asso = shift;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   119
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   120
    # Decrease the number of total connections
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   121
    $AllAssociations--;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   122
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   123
    # Delete the socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   124
    my $link = delete $asso->{sock};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   125
    return unless defined($link);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   126
    # Set to not try and check for reads and writes of this socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   127
    define_result_manager ($link, "write" => undef, "read" => undef);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   128
    close($link);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   129
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   130
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   131
# transmit_immediately
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   132
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   133
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   134
# $asso (Connection object)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   135
# $content (Message to send)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   136
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   137
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   138
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   139
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   140
# This function does a immediate send, this will block if the socket is not writeable
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   141
sub transmit_immediately {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   142
    my ($asso, $content) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   143
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   144
    # Puts the message in the queue
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   145
    _lineup ($asso, $content);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   146
    # Flushes the queue
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   147
    $asso->_transmit (1); # 1 ==> flush
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   148
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   149
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   150
# transmit_afterwards
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   151
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   152
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   153
# $asso (Connection object)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   154
# $content (Message to send)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   155
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   156
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   157
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   158
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   159
# This function does a sends at a later time, does not block if the socket is not writeable.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   160
# It sets a callback to send the data in the queue when the socket is writeable
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   161
sub transmit_afterwards {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   162
    my ($asso, $content) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   163
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   164
    # Puts the message in the queue
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   165
    _lineup($asso, $content);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   166
    # Get the current socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   167
    my $link = $asso->{sock};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   168
    return unless defined($link);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   169
    # Sets the callback to send the data when the socket is writeable
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   170
    define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   171
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   172
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   173
# _lineup
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   174
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   175
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   176
# $asso (Connection object)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   177
# $content (Message to send)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   178
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   179
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   180
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   181
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   182
# This is a private function to place the message on the queue for this socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   183
sub _lineup {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   184
    my ($asso, $content) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   185
    # prepend length (encoded as network long)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   186
    my $dist = length($content);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   187
    # Stores the length as a network long in the first 4 bytes of the message
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   188
    $content = pack ('N', $dist) . $content; 
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   189
    push (@{$asso->{queue}}, $content);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   190
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   191
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   192
# _transmit
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   193
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   194
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   195
# $asso (Connection object)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   196
# $remove (Deferred Mode)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   197
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   198
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   199
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   200
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   201
# This is a private function sends the data
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   202
sub _transmit {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   203
    my ($asso, $remove) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   204
    my $link = $asso->{sock};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   205
    return unless defined($link);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   206
    my ($Lrq) = $asso->{queue};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   207
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   208
    # If $remove is set, set the socket to blocking, and send all
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   209
    # messages in the queue - return only if there's an error
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   210
    # If $remove is 0 (deferred mode) make the socket non-blocking, and
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   211
    # return to the event loop only after every message, or if it
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   212
    # is likely to block in the middle of a message.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   213
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   214
    $remove ? $asso->define_obstructing() : $asso->define_not_obstructing();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   215
    my $branch = (exists $asso->{send_offset}) ? $asso->{send_offset} : 0;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   216
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   217
    # Loop through the messages in the queue
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   218
    while (@$Lrq) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   219
        my $content            = $Lrq->[0];
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   220
        my $sequencetoPublish = length($content) - $branch;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   221
        my $sequence_published  = 0;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   222
        while ($sequencetoPublish) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   223
            $sequence_published = syswrite ($link, $content,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   224
                                       $sequencetoPublish, $branch);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   225
            if (!defined($sequence_published)) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   226
                if (_faultwillObstruct($!)) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   227
                    # Should happen only in deferred mode. Record how
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   228
                    # much we have already sent.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   229
                    $asso->{send_offset} = $branch;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   230
                    # Event handler should already be set, so we will
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   231
                    # be called back eventually, and will resume sending
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   232
                    return 1;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   233
                } else {    # Uh, oh
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   234
                    $asso->manage_transmitted_fault($!);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   235
                    return 0; # fail. Message remains in queue ..
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   236
                }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   237
            }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   238
            $branch         += $sequence_published;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   239
            $sequencetoPublish -= $sequence_published;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   240
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   241
        delete $asso->{send_offset};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   242
        $branch = 0;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   243
        shift @$Lrq;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   244
        last unless $remove; # Go back to select and wait
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   245
                            # for it to fire again.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   246
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   247
    # Call me back if queue has not been drained.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   248
    if (@$Lrq) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   249
        define_result_manager ($link, "write" => sub {$asso->_transmit(0)});
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   250
    } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   251
        define_result_manager ($link, "write" => undef);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   252
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   253
    1;  # Success
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   254
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   255
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   256
# _faultwillObstruct
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   257
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   258
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   259
# $asso (Connection object)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   260
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   261
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   262
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   263
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   264
# This is a private function processes the blocking error message
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   265
sub _faultwillObstruct {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   266
    if ($obstructing_maintained) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   267
        return ($_[0] == EAGAIN());
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   268
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   269
    return 0;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   270
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   271
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   272
# define_not_obstructing
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   273
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   274
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   275
# $_[0] (Connection socket)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   276
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   277
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   278
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   279
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   280
# This is a function set non-blocking on a socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   281
sub define_not_obstructing {                        # $asso->define_obstructing
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   282
    if ($obstructing_maintained) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   283
        # preserve other fcntl flags
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   284
        my $pins = fcntl ($_[0], F_GETFL(), 0);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   285
        fcntl ($_[0], F_SETFL(), $pins | O_NONBLOCK());
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   286
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   287
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   288
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   289
# define_obstructing
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   290
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   291
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   292
# $_[0] (Connection socket)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   293
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   294
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   295
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   296
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   297
# This is a function set blocking on a socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   298
sub define_obstructing {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   299
    if ($obstructing_maintained) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   300
        my $pins = fcntl ($_[0], F_GETFL(), 0);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   301
        $pins  &= ~O_NONBLOCK(); # Clear blocking, but preserve other flags
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   302
        fcntl ($_[0], F_SETFL(), $pins);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   303
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   304
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   305
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   306
# manage_transmitted_fault
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   307
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   308
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   309
# $asso (Connection object)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   310
# $fault_content (Error message)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   311
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   312
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   313
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   314
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   315
# This is a function warns on send errors and removes the socket from list of writable sockets
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   316
sub manage_transmitted_fault {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   317
   # For more meaningful handling of send errors, subclass Msg and
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   318
   # rebless $asso.  
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   319
   my ($asso, $fault_content) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   320
   warn "Error while sending: $fault_content \n";
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   321
   define_result_manager ($asso->{sock}, "write" => undef);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   322
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   323
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   324
#-----------------------------------------------------------------
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   325
# Receive side routines
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   326
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   327
# recent_agent
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   328
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   329
# Inputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   330
# $collection (Package)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   331
# $mi_receiver (Hostname of the interface to use)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   332
# $mi_change (Port number to listen on)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   333
# $enter_process (Reference to function to call when accepting a connection)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   334
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   335
# Outputs
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   336
#
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   337
# Description
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   338
# This is a function create a listening socket
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   339
my ($g_enter_process,$g_collection);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   340
my $primary_plug = 0;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   341
sub recent_agent {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   342
    @_ >= 4 || die "Msg->recent_agent (myhost, myport, login_proc)\n";
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   343
    my ($RepeatNumber);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   344
    my ($collection, $changes, $mi_receiver, $enter_process, $iAssociationBreak, $PlugAssociations) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   345
    # Set a default Socket timeout value
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   346
    $iAssociationBreak = 0 if (!defined $iAssociationBreak);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   347
    # Set a default Socket retry to be forever
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   348
    $PlugAssociations = -1 if (!defined $PlugAssociations);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   349
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   350
    while(!$primary_plug)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   351
    {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   352
        #Check to see if there is a retry limit and if the limit has been reached
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   353
        if ($PlugAssociations != -1)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   354
        {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   355
            if (($RepeatNumber / scalar(@$changes)) >= $PlugAssociations)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   356
            {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   357
                die "ERROR: could not create socket after ".$RepeatNumber / scalar(@$changes)." attempts";            
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   358
            } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   359
                # Increment the number of retries
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   360
                $RepeatNumber++;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   361
            }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   362
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   363
        
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   364
        #Try the first port on the list
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   365
        my $mi_change = shift(@$changes);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   366
        #Place the port on the back of the queue
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   367
        push @$changes,$mi_change;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   368
        
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   369
        print "Using port number $mi_change\n";
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   370
        $primary_plug = IO::Socket::INET->new (
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   371
                                              LocalAddr => $mi_receiver,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   372
                                              LocalPort => $mi_change,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   373
                                              Listen    => 5,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   374
                                              Proto     => 'tcp',
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   375
                                              TimeOut =>    10,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   376
                                              Reuse     => 1);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   377
        sleep $iAssociationBreak if (!$primary_plug);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   378
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   379
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   380
    # Set KeepAlive
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   381
    setsockopt($primary_plug, SOL_SOCKET, SO_KEEPALIVE,  pack("l", 1));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   382
    setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPIDLE,  pack("l", 30));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   383
    setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPCNT,   pack("l", 2));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   384
    setsockopt($primary_plug, &Socket::IPPROTO_TCP, TCP_KEEPINTVL, pack("l", 30));
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   385
    
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   386
    # Add the socket to the list on filehandles to read from.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   387
    define_result_manager ($primary_plug, "read" => \&_recent_node);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   388
    # Store the package name and login proc for later use
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   389
    $g_enter_process = $enter_process; $g_collection = $collection;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   390
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   391
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   392
sub get_immediately {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   393
    my ($asso) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   394
    my ($content, $fault) = _get ($asso, 1); # 1 ==> rcv now
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   395
    return wantarray ? ($content, $fault) : $content;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   396
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   397
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   398
sub _get {                     # Complement to _transmit
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   399
    my ($asso, $get_immediately) = @_; # $get_immediately complement of $remove
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   400
    # Find out how much has already been received, if at all
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   401
    my ($content, $branch, $sequencetoScan, $sequence_scan);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   402
    my $link = $asso->{sock};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   403
    return unless defined($link);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   404
    if (exists $asso->{msg}) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   405
        $content           = $asso->{msg};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   406
        $branch        = length($content) - 1;  # sysread appends to it.
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   407
        $sequencetoScan = $asso->{bytes_to_read};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   408
        delete $asso->{'msg'};              # have made a copy
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   409
    } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   410
        # The typical case ...
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   411
        $content           = "";                # Otherwise -w complains 
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   412
        $branch        = 0 ;  
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   413
        $sequencetoScan = 0 ;                # Will get set soon
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   414
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   415
    # We want to read the message length in blocking mode. Quite
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   416
    # unlikely that we'll get blocked too long reading 4 bytes
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   417
    if (!$sequencetoScan)  {                 # Get new length 
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   418
        my $storage;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   419
        $asso->define_obstructing();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   420
        $sequence_scan = sysread($link, $storage, 4);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   421
        if ($! || ($sequence_scan != 4)) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   422
            goto FINISH;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   423
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   424
        $sequencetoScan = unpack ('N', $storage);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   425
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   426
    $asso->define_not_obstructing() unless $get_immediately;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   427
    while ($sequencetoScan) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   428
        $sequence_scan = sysread ($link, $content, $sequencetoScan, $branch);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   429
        if (defined ($sequence_scan)) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   430
            if ($sequence_scan == 0) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   431
                last;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   432
            }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   433
            $sequencetoScan -= $sequence_scan;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   434
            $branch        += $sequence_scan;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   435
        } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   436
            if (_faultwillObstruct($!)) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   437
                # Should come here only in non-blocking mode
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   438
                $asso->{msg}           = $content;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   439
                $asso->{bytes_to_read} = $sequencetoScan;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   440
                return ;   # .. _get will be called later
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   441
                           # when socket is readable again
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   442
            } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   443
                last;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   444
            }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   445
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   446
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   447
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   448
  FINISH:
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   449
    if (length($content) == 0) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   450
        $asso->unplug();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   451
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   452
    if ($get_immediately) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   453
        return ($content, $!);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   454
    } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   455
        &{$asso->{rcvd_notification_proc}}($asso, $content, $!);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   456
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   457
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   458
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   459
sub _recent_node {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   460
    my $link = $primary_plug->accept();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   461
    $AllAssociations++;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   462
    my $asso = bless {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   463
        'sock' =>  $link,
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   464
        'state' => 'connected'
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   465
    }, $g_collection;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   466
    my $get_notice_process =
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   467
        &$g_enter_process ($asso, $link->peerhost(), $link->peerport());
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   468
    if ($get_notice_process) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   469
        $asso->{rcvd_notification_proc} = $get_notice_process;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   470
        my $retrieve = sub {_get($asso,0)};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   471
        define_result_manager ($link, "read" => $retrieve);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   472
    } else {  # Login failed
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   473
        $asso->unplug();
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   474
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   475
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   476
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   477
#----------------------------------------------------
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   478
# Event loop routines used by both client and server
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   479
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   480
sub define_result_manager {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   481
    shift unless ref($_[0]); # shift if first arg is package name
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   482
    my ($manage, %parameters) = @_;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   483
    my $retrieve;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   484
    if (exists $parameters{'write'}) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   485
        $retrieve = $parameters{'write'};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   486
        if ($retrieve) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   487
            $publish_retrieves{$manage} = $retrieve;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   488
            $publish_manages->add($manage);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   489
        } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   490
            delete $publish_retrieves{$manage};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   491
            $publish_manages->remove($manage);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   492
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   493
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   494
    if (exists $parameters{'read'}) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   495
        $retrieve = $parameters{'read'};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   496
        if ($retrieve) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   497
            $scan_retrieves{$manage} = $retrieve;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   498
            $scan_manages->add($manage);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   499
        } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   500
            delete $scan_retrieves{$manage};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   501
            $scan_manages->remove($manage);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   502
       }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   503
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   504
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   505
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   506
sub result_iteration {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   507
    my ($collection, $starting_scan_break, $iteration_number) = @_; # result_iteration(1) to process events once
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   508
    my ($asso, $scan, $publish, $scandefine, $publishdefine);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   509
    while (1) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   510
        # Quit the loop if no handles left to process
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   511
        last unless ($scan_manages->count() || $publish_manages->count());
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   512
        if (defined $starting_scan_break)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   513
        {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   514
            ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, $starting_scan_break);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   515
            # On initial timeout a read expect a read within timeout if not disconnect
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   516
            if (!defined $scandefine)
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   517
            {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   518
              print "WARNING: no response from server within $starting_scan_break seconds\n";
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   519
              last;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   520
            }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   521
            # Unset intial timeout
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   522
            $starting_scan_break = undef;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   523
        } else {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   524
            ($scandefine, $publishdefine) = IO::Select->select ($scan_manages, $publish_manages, undef, undef);
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   525
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   526
        foreach $scan (@$scandefine) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   527
            &{$scan_retrieves{$scan}} ($scan) if exists $scan_retrieves{$scan};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   528
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   529
        foreach $publish (@$publishdefine) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   530
            &{$publish_retrieves{$publish}}($publish) if exists $publish_retrieves{$publish};
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   531
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   532
        if (defined($iteration_number)) {
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   533
            last unless --$iteration_number;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   534
        }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   535
    }
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   536
}
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   537
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   538
1;
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   539
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   540
__END__
3f65fd25dfd4 sync up SVN codes
kelvzhu
parents:
diff changeset
   541