--- a/hti/PC_Tools/HTIGateway/HtiGateway/src/IPComm.cpp Mon Jun 28 15:36:07 2010 +0300
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,518 +0,0 @@
-/*
-* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
-* All rights reserved.
-* This component and the accompanying materials are made available
-* under the terms of "Eclipse Public License v1.0"
-* which accompanies this distribution, and is available
-* at the URL "http://www.eclipse.org/legal/epl-v10.html".
-*
-* Initial Contributors:
-* Nokia Corporation - initial contribution.
-*
-* Contributors:
-*
-* Description:
-* This file contains the header file of the IPCommPlugin,
-* IPCommReaderThread, IPCommWriterThread and IPCommMonitorThread classes.
-*/
-
-// INCLUDES
-#include "socket.h"
-#include "IPCommPlugin.h"
-#include "util.h"
-
-
-const static int g_IPMaxResendNumber = 2;
-
-//**********************************************************************************
-// Class IPCommPlugin
-//
-// This class implements a CommChannelPlugin which is used to communicate with device using TCP/IP
-//**********************************************************************************
-
-IPCommPlugin::IPCommPlugin(const CommChannelPluginObserver* observer)
- : CommChannelPlugin(observer),
- m_TxQueue(),
- m_RxQueue(),
- m_PropertyLocalPort(0),
- m_PropertyRemotePort(0)
-{
- m_MonitorThread = NULL;
-}
-
-IPCommPlugin::~IPCommPlugin()
-{
- Util::Debug("IPCommPlugin::~IPCommPlugin()");
- if (m_Open)
- {
- Close();
- }
- if (m_MonitorThread != NULL)
- {
- delete m_MonitorThread;
- m_MonitorThread = NULL;
- }
-}
-
-/*
- * This method initializes IPCommPlugin and Starts IPCommMonitorThread
- */
-DWORD IPCommPlugin::Init()
-{
- Util::Debug("IPCommPlugin::Init()");
-
- std::string filename = IP_INI_FILE_NAME;
- map<string, string> IPCommPluginProperties;
- Util::ReadProperties(filename.c_str(), IPCommPluginProperties);
-
- CheckProperties(IPCommPluginProperties);
-
- CommChannelPlugin::Init();
-
- m_MonitorThread = new IPCommMonitorThread(&m_TxQueue,
- &m_RxQueue,
- m_PropertyLocalPort,
- m_PropertyRemoteHost,
- m_PropertyRemotePort,
- m_PropertyRecvBufferSize);
-
- m_MonitorThread->Start();
-
- Util::Debug("IPCommPlugin::Init() IPComm opened");
- m_Open = true;
- Util::Debug("IPCommPlugin::Init() OK");
- return NO_ERRORS;
-}
-
-/*
- * This method initializes class member variables from values in map
- */
-void IPCommPlugin::CheckProperties(map<string, string>& props)
-{
- char tmp[256];
-
- // Local port
- string val = props[IP_INI_LOCAL_PORT_PARAM];
- Util::CheckCommandlineParam( PARAM_SWITCH_LOCAL_PORT_PARAM, val );
- if (!val.empty())
- {
- m_PropertyLocalPort = atol(val.c_str());
- }
-
- // Receive TCP/IP buffer size
- val = props[IP_INI_RECV_BUFFER_SIZE_PARAM];
- Util::CheckCommandlineParam( PARAM_SWITCH_RECV_BUFFER_SIZE_PARAM, val );
- if (!val.empty())
- {
- m_PropertyRecvBufferSize = atol(val.c_str());
- }
- else
- {
- // Use 8*1024 bytes (8KB) as default value
- m_PropertyRecvBufferSize = 8*1024;
- }
-
- if( m_PropertyLocalPort )
- {
- sprintf(tmp, "[IPComm] Local port : %d", m_PropertyLocalPort );
- string s(tmp);
- Util::Info(s);
- }
- else
- {
- // Remote host
- m_PropertyRemoteHost = props[IP_INI_REMOTE_HOST_PARAM];
- //Check and replace if -REMOTE_HOST was given as command line parameter
- Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_HOST_PARAM, m_PropertyRemoteHost );
- if(m_PropertyRemoteHost.empty())
- {
- throw "No remote host specified!";
- }
- sprintf(tmp, "[IPComm] Remote host : '%s'", m_PropertyRemoteHost.c_str());
- string s = tmp;
- Util::Info(s);
-
- // Remote port
- val = props[IP_INI_REMOTE_PORT_PARAM];
- //Check and replace if -REMOTE_POST was given as command line parameter
- Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_PORT_PARAM, val );
- if (!val.empty())
- {
- m_PropertyRemotePort = atol(val.c_str());
- }
- if( m_PropertyRemotePort == 0)
- throw "Invalid remote port specified!";
-
- sprintf(tmp, "[IPComm] Remote port : %d", m_PropertyRemotePort );
- s = tmp;
- Util::Info(s);
- }
-}
-
-/*
- * This method checks if data is available on incoming queue
- */
-bool IPCommPlugin::IsDataAvailable()
-{
- return (!m_RxQueue.empty());
-}
-
-/*
- * This method is used to push given data to outgoing queue and then
- * wait for data to become available and read all data into single Data object
- */
-DWORD IPCommPlugin::SendReceive(Data* data_in, Data** data_out, long timeout)
-{
- DWORD res;
- if ((res = Send(data_in, timeout)) == NO_ERRORS &&
- (res = ReceiveWait(data_out, timeout)) == NO_ERRORS)
- {
- return NO_ERRORS;
- }
- cout << "IPCommPlugin::SendReceive: error" << endl;
- return res;
-}
-
-/*
- * This method pushes the given Data object(of type Data::EData) to outgoing queue
- */
-DWORD IPCommPlugin::Send(Data* data_in, long timeout)
-{
- Data::DataType type = data_in->GetType();
- if (type == Data::EData)
- {
- DWORD length = data_in->GetLength();
- m_TxQueue.push(data_in);
- return NO_ERRORS;
- }
- else if (type == Data::EControl)
- {
- Util::Debug("IPCommPlugin::Send: Control Message");
- return NO_ERRORS;
- }
- return ERR_DG_COMM_DATA_SEND;
-}
-
-/*
- * This method is used to wait for data to become available in incoming queue
- * and then read all data into single Data object which is given as parameter
- */
-DWORD IPCommPlugin::ReceiveWait(Data** data_out, long timeout)
-{
- long elapsed = 0;
- while (elapsed < timeout && !IsDataAvailable())
- {
- elapsed += 25;
- Sleep(25);
- }
- if (elapsed >= timeout)
- {
- return ERR_DG_COMM_DATA_RECV_TIMEOUT;
- }
- return Receive(data_out, timeout);
-}
-
-/*
- * This method is used to read all data in incoming queue to single Data object and store the result
- * to the data object given parameter
- */
-DWORD IPCommPlugin::Receive(Data** data_out, long timeout)
-{
- if (!m_RxQueue.empty())
- {
- *data_out = m_RxQueue.front();
- m_RxQueue.pop();
- return NO_ERRORS;
- }
- return ERR_DG_COMM_DATA_RECV;
-}
-
-
-DWORD IPCommPlugin::Open()
-{
- return (m_Open ? NO_ERRORS : ERR_DG_COMM_OPEN);
-}
-
-DWORD IPCommPlugin::Close()
-{
- m_MonitorThread->Stop();
- WaitForSingleObject(m_MonitorThread->ThreadHandle(), g_MaximumShutdownWaitTime);
- return NO_ERRORS;
-}
-
-
-//**********************************************************************************
-// Class IPCommReaderThread
-//
-// This thread is used to read bytes from TCP/IP socket, encapsulate the bytes to Data objects
-// and push them to incoming queue
-//**********************************************************************************
-
-IPCommReaderThread::IPCommReaderThread(SafeQueue<Data*>* q,
- long bufsize)
- :m_Running(false),
- m_Socket(NULL)
-{
- m_Queue = q;
- m_TcpIpBufferSize = bufsize;
-}
-
-/*
- * Main execution loop which reads bytes from socket, encapsulates the bytes to Data object and pushes them to incoming queue
- */
-void IPCommReaderThread::Run()
-{
- if( m_Socket )
- m_Running = true;
-
- BYTE* buffer = new BYTE[m_TcpIpBufferSize];
- while (m_Running)
- {
- // Reading from TCP/IP port
- //Util::Debug("[IPCommReaderThread] try to read");
- int bytes_read = -1;
- bytes_read = m_Socket->ReceiveBytes(buffer, m_TcpIpBufferSize);
- if (bytes_read < 0)
- {
- Stop();
- break;
- }
- if (bytes_read > 0)
- {
- Data* d = new Data((void *)buffer, bytes_read, Data::EData);
- if (Util::GetVerboseLevel() == Util::VerboseLevel::debug)
- {
- char tmp[64];
- sprintf(tmp, "m_Socket->ReceiveBytes (%d (dec) bytes):", d->GetLength());
- string s(tmp);
- Util::Debug(s);
- }
- m_Queue->push(d);
- d = NULL;
- }
- Sleep(0);
- }
- delete[] buffer;
- buffer = NULL;
-}
-
-void IPCommReaderThread::Stop()
-{
- m_Running = false;
-}
-
-bool IPCommReaderThread::IsRunning()
-{
- return m_Running;
-}
-
-
-
-//**********************************************************************************
-// Class DataGatewaySocketWriterThread
-//
-// This thread is used to write data from outgoing queue to TCP/IP socket
-//**********************************************************************************
-
-IPCommWriterThread::IPCommWriterThread(SafeQueue<Data*>* q)
- :m_Running(false),
- m_Socket(NULL)
-{
- m_Queue = q;
-}
-
-/*
- * This method contains the main execution loop which gets Data from outgoing queue and sends it to socket
- */
-void IPCommWriterThread::Run()
-{
- if( m_Socket )
- m_Running = true;
-
- while (m_Running)
- {
- // Sending to TCP/IP port
- //Util::Debug("[IPCommWriterThread] try to send");
- try
- {
- Data* d = m_Queue->front(50);
- char* p = (char *)d->GetData();
- DWORD l = d->GetLength();
-
- if (Util::GetVerboseLevel() == Util::VerboseLevel::debug)
- {
- char tmp[64];
- sprintf(tmp, "[IPCommWriterThread] HTI MsgSize = %d", l);
- string s(tmp);
- Util::Debug(s);
- }
-
- m_Socket->SendBytes((const unsigned char *)p, l);
- m_Queue->pop();
- delete d;
- d = NULL;
-
- } catch (TimeoutException te)
- {
- //Util::Debug("[IPCommWriterThread]timeout exception");
- }
- }
-}
-
-void IPCommWriterThread::Stop()
-{
- m_Running = false;
-}
-
-bool IPCommWriterThread::IsRunning()
-{
- return m_Running;
-}
-
-
-//**********************************************************************************
-// Class IPCommMonitorThread
-//
-// This thread creates and starts reader and writer threads
-// The thread also monitors if reader and writer threads are running and restarts them in case either isn't running
-//**********************************************************************************
-
-IPCommMonitorThread::IPCommMonitorThread(SafeQueue<Data*>* TxQueue,
- SafeQueue<Data*>* RxQueue,
- int LocalPort,
- string& RemoteHost,
- int RemotePort,
- long RecvBufferSize)
- : m_Running(false),
- m_TxQueue(TxQueue),
- m_RxQueue(RxQueue),
- m_ReaderThread(NULL),
- m_WriterThread(NULL),
- m_LocalPort(LocalPort),
- m_RemoteHost(RemoteHost),
- m_RemotePort(RemotePort),
- m_RecvBufferSize(RecvBufferSize)
-{
-}
-
-IPCommMonitorThread::~IPCommMonitorThread()
-{
- if(m_ReaderThread)
- {
- delete m_ReaderThread;
- m_ReaderThread = NULL;
- }
- if(m_WriterThread)
- {
- delete m_WriterThread;
- m_WriterThread = NULL;
- }
-}
-
-/*
- * This method has two functionalities
- * -It waits for incoming connections if local port is defined
- * -It tries to connect to remote host if local host is not defined
- */
-void IPCommMonitorThread::Connect(Socket*& s)
-{
- // This trickery here is because if there are no sockets (Socket::nofSockets_)
- // WSACleanup gets called and then SOAP gets messed up.
- // And creating a new socket for a new connection seems to work better
- // than using the old when re-connecting / re-listening.
- Socket* new_s = new Socket();
- delete s;
- s = new_s;
-
- // If local port is defined start open listening socket
- SocketServer ss;
- if( m_LocalPort )
- {
- Util::Info("[IPComm] Listen for incoming connection...");
- String remoteHost( "[IPComm] Connected! Remote host : " );
- ss.Accept( s, m_LocalPort, 1, remoteHost );
- Util::Info(remoteHost);
- }
- // If not start connecting
- else
- {
- Util::Info("[IPComm] Connecting...");
- ss.Connect( s, m_RemoteHost.c_str(), m_RemotePort );
- Util::Info("[IPComm] Connected!");
- }
-}
-
-/*
- * Main execution loop of thread
- * -Creates reader and writer threads and starts them
- * -Monitors if either reader or writer thread aren't running and restarts them if not
- */
-void IPCommMonitorThread::Run()
-{
- Socket* s = NULL;
-
- m_ReaderThread = new IPCommReaderThread( m_RxQueue, m_RecvBufferSize );
- m_WriterThread = new IPCommWriterThread( m_TxQueue ) ;
-
- m_Running = true;
- while (m_Running)
- {
- // Reader thread should stop running when connection is lost
- if( !m_ReaderThread->IsRunning() || !m_WriterThread->IsRunning() )
- {
- Util::Info( "[IPComm] Disconnected!" );
-
- // Stop the treads
- m_ReaderThread->Stop();
- WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
- m_WriterThread->Stop();
- WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
-
- // Try to connect again.
- BOOL connected = false;
- while( m_Running && connected == false)
- {
- try{
- Connect(s);
- connected = true;
- m_ReaderThread->m_Socket = s;
- m_WriterThread->m_Socket = s;
-
- // Start threads
- m_ReaderThread->Start();
- m_WriterThread->Start();
- SetThreadPriority( m_ReaderThread->ThreadHandle(), THREAD_PRIORITY_LOWEST);
- int priority = GetThreadPriority(m_ReaderThread->ThreadHandle());
- int i = 0;
- }
- catch( char* ){
- Sleep(1000);
- }
- }
- }
- Sleep(1000);
- }
-
- // Stop the treads
- m_ReaderThread->Stop();
- WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
- m_WriterThread->Stop();
- WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
- // and close the current socket
- if( s )
- delete s;
-}
-
-void IPCommMonitorThread::Stop()
-{
- m_Running = false;
- m_ReaderThread->Stop();
- m_WriterThread->Stop();
-}
-
-bool IPCommMonitorThread::IsRunning()
-{
- return m_Running;
-}
-
-// End of the file
\ No newline at end of file