--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/hti/PC_Tools/HTIGateway/HtiGateway/src/IPComm.cpp Wed Oct 13 16:17:58 2010 +0300
@@ -0,0 +1,518 @@
+/*
+* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description:
+* This file contains the header file of the IPCommPlugin,
+* IPCommReaderThread, IPCommWriterThread and IPCommMonitorThread classes.
+*/
+
+// INCLUDES
+#include "socket.h"
+#include "IPCommPlugin.h"
+#include "util.h"
+
+
+const static int g_IPMaxResendNumber = 2;
+
+//**********************************************************************************
+// Class IPCommPlugin
+//
+// This class implements a CommChannelPlugin which is used to communicate with device using TCP/IP
+//**********************************************************************************
+
+IPCommPlugin::IPCommPlugin(const CommChannelPluginObserver* observer)
+ : CommChannelPlugin(observer),
+ m_TxQueue(),
+ m_RxQueue(),
+ m_PropertyLocalPort(0),
+ m_PropertyRemotePort(0)
+{
+ m_MonitorThread = NULL;
+}
+
+IPCommPlugin::~IPCommPlugin()
+{
+ Util::Debug("IPCommPlugin::~IPCommPlugin()");
+ if (m_Open)
+ {
+ Close();
+ }
+ if (m_MonitorThread != NULL)
+ {
+ delete m_MonitorThread;
+ m_MonitorThread = NULL;
+ }
+}
+
+/*
+ * This method initializes IPCommPlugin and Starts IPCommMonitorThread
+ */
+DWORD IPCommPlugin::Init()
+{
+ Util::Debug("IPCommPlugin::Init()");
+
+ std::string filename = IP_INI_FILE_NAME;
+ map<string, string> IPCommPluginProperties;
+ Util::ReadProperties(filename.c_str(), IPCommPluginProperties);
+
+ CheckProperties(IPCommPluginProperties);
+
+ CommChannelPlugin::Init();
+
+ m_MonitorThread = new IPCommMonitorThread(&m_TxQueue,
+ &m_RxQueue,
+ m_PropertyLocalPort,
+ m_PropertyRemoteHost,
+ m_PropertyRemotePort,
+ m_PropertyRecvBufferSize);
+
+ m_MonitorThread->Start();
+
+ Util::Debug("IPCommPlugin::Init() IPComm opened");
+ m_Open = true;
+ Util::Debug("IPCommPlugin::Init() OK");
+ return NO_ERRORS;
+}
+
+/*
+ * This method initializes class member variables from values in map
+ */
+void IPCommPlugin::CheckProperties(map<string, string>& props)
+{
+ char tmp[256];
+
+ // Local port
+ string val = props[IP_INI_LOCAL_PORT_PARAM];
+ Util::CheckCommandlineParam( PARAM_SWITCH_LOCAL_PORT_PARAM, val );
+ if (!val.empty())
+ {
+ m_PropertyLocalPort = atol(val.c_str());
+ }
+
+ // Receive TCP/IP buffer size
+ val = props[IP_INI_RECV_BUFFER_SIZE_PARAM];
+ Util::CheckCommandlineParam( PARAM_SWITCH_RECV_BUFFER_SIZE_PARAM, val );
+ if (!val.empty())
+ {
+ m_PropertyRecvBufferSize = atol(val.c_str());
+ }
+ else
+ {
+ // Use 8*1024 bytes (8KB) as default value
+ m_PropertyRecvBufferSize = 8*1024;
+ }
+
+ if( m_PropertyLocalPort )
+ {
+ sprintf(tmp, "[IPComm] Local port : %d", m_PropertyLocalPort );
+ string s(tmp);
+ Util::Info(s);
+ }
+ else
+ {
+ // Remote host
+ m_PropertyRemoteHost = props[IP_INI_REMOTE_HOST_PARAM];
+ //Check and replace if -REMOTE_HOST was given as command line parameter
+ Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_HOST_PARAM, m_PropertyRemoteHost );
+ if(m_PropertyRemoteHost.empty())
+ {
+ throw "No remote host specified!";
+ }
+ sprintf(tmp, "[IPComm] Remote host : '%s'", m_PropertyRemoteHost.c_str());
+ string s = tmp;
+ Util::Info(s);
+
+ // Remote port
+ val = props[IP_INI_REMOTE_PORT_PARAM];
+ //Check and replace if -REMOTE_POST was given as command line parameter
+ Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_PORT_PARAM, val );
+ if (!val.empty())
+ {
+ m_PropertyRemotePort = atol(val.c_str());
+ }
+ if( m_PropertyRemotePort == 0)
+ throw "Invalid remote port specified!";
+
+ sprintf(tmp, "[IPComm] Remote port : %d", m_PropertyRemotePort );
+ s = tmp;
+ Util::Info(s);
+ }
+}
+
+/*
+ * This method checks if data is available on incoming queue
+ */
+bool IPCommPlugin::IsDataAvailable()
+{
+ return (!m_RxQueue.empty());
+}
+
+/*
+ * This method is used to push given data to outgoing queue and then
+ * wait for data to become available and read all data into single Data object
+ */
+DWORD IPCommPlugin::SendReceive(Data* data_in, Data** data_out, long timeout)
+{
+ DWORD res;
+ if ((res = Send(data_in, timeout)) == NO_ERRORS &&
+ (res = ReceiveWait(data_out, timeout)) == NO_ERRORS)
+ {
+ return NO_ERRORS;
+ }
+ cout << "IPCommPlugin::SendReceive: error" << endl;
+ return res;
+}
+
+/*
+ * This method pushes the given Data object(of type Data::EData) to outgoing queue
+ */
+DWORD IPCommPlugin::Send(Data* data_in, long timeout)
+{
+ Data::DataType type = data_in->GetType();
+ if (type == Data::EData)
+ {
+ DWORD length = data_in->GetLength();
+ m_TxQueue.push(data_in);
+ return NO_ERRORS;
+ }
+ else if (type == Data::EControl)
+ {
+ Util::Debug("IPCommPlugin::Send: Control Message");
+ return NO_ERRORS;
+ }
+ return ERR_DG_COMM_DATA_SEND;
+}
+
+/*
+ * This method is used to wait for data to become available in incoming queue
+ * and then read all data into single Data object which is given as parameter
+ */
+DWORD IPCommPlugin::ReceiveWait(Data** data_out, long timeout)
+{
+ long elapsed = 0;
+ while (elapsed < timeout && !IsDataAvailable())
+ {
+ elapsed += 25;
+ Sleep(25);
+ }
+ if (elapsed >= timeout)
+ {
+ return ERR_DG_COMM_DATA_RECV_TIMEOUT;
+ }
+ return Receive(data_out, timeout);
+}
+
+/*
+ * This method is used to read all data in incoming queue to single Data object and store the result
+ * to the data object given parameter
+ */
+DWORD IPCommPlugin::Receive(Data** data_out, long timeout)
+{
+ if (!m_RxQueue.empty())
+ {
+ *data_out = m_RxQueue.front();
+ m_RxQueue.pop();
+ return NO_ERRORS;
+ }
+ return ERR_DG_COMM_DATA_RECV;
+}
+
+
+DWORD IPCommPlugin::Open()
+{
+ return (m_Open ? NO_ERRORS : ERR_DG_COMM_OPEN);
+}
+
+DWORD IPCommPlugin::Close()
+{
+ m_MonitorThread->Stop();
+ WaitForSingleObject(m_MonitorThread->ThreadHandle(), g_MaximumShutdownWaitTime);
+ return NO_ERRORS;
+}
+
+
+//**********************************************************************************
+// Class IPCommReaderThread
+//
+// This thread is used to read bytes from TCP/IP socket, encapsulate the bytes to Data objects
+// and push them to incoming queue
+//**********************************************************************************
+
+IPCommReaderThread::IPCommReaderThread(SafeQueue<Data*>* q,
+ long bufsize)
+ :m_Running(false),
+ m_Socket(NULL)
+{
+ m_Queue = q;
+ m_TcpIpBufferSize = bufsize;
+}
+
+/*
+ * Main execution loop which reads bytes from socket, encapsulates the bytes to Data object and pushes them to incoming queue
+ */
+void IPCommReaderThread::Run()
+{
+ if( m_Socket )
+ m_Running = true;
+
+ BYTE* buffer = new BYTE[m_TcpIpBufferSize];
+ while (m_Running)
+ {
+ // Reading from TCP/IP port
+ //Util::Debug("[IPCommReaderThread] try to read");
+ int bytes_read = -1;
+ bytes_read = m_Socket->ReceiveBytes(buffer, m_TcpIpBufferSize);
+ if (bytes_read < 0)
+ {
+ Stop();
+ break;
+ }
+ if (bytes_read > 0)
+ {
+ Data* d = new Data((void *)buffer, bytes_read, Data::EData);
+ if (Util::GetVerboseLevel() == Util::VerboseLevel::debug)
+ {
+ char tmp[64];
+ sprintf(tmp, "m_Socket->ReceiveBytes (%d (dec) bytes):", d->GetLength());
+ string s(tmp);
+ Util::Debug(s);
+ }
+ m_Queue->push(d);
+ d = NULL;
+ }
+ Sleep(0);
+ }
+ delete[] buffer;
+ buffer = NULL;
+}
+
+void IPCommReaderThread::Stop()
+{
+ m_Running = false;
+}
+
+bool IPCommReaderThread::IsRunning()
+{
+ return m_Running;
+}
+
+
+
+//**********************************************************************************
+// Class DataGatewaySocketWriterThread
+//
+// This thread is used to write data from outgoing queue to TCP/IP socket
+//**********************************************************************************
+
+IPCommWriterThread::IPCommWriterThread(SafeQueue<Data*>* q)
+ :m_Running(false),
+ m_Socket(NULL)
+{
+ m_Queue = q;
+}
+
+/*
+ * This method contains the main execution loop which gets Data from outgoing queue and sends it to socket
+ */
+void IPCommWriterThread::Run()
+{
+ if( m_Socket )
+ m_Running = true;
+
+ while (m_Running)
+ {
+ // Sending to TCP/IP port
+ //Util::Debug("[IPCommWriterThread] try to send");
+ try
+ {
+ Data* d = m_Queue->front(50);
+ char* p = (char *)d->GetData();
+ DWORD l = d->GetLength();
+
+ if (Util::GetVerboseLevel() == Util::VerboseLevel::debug)
+ {
+ char tmp[64];
+ sprintf(tmp, "[IPCommWriterThread] HTI MsgSize = %d", l);
+ string s(tmp);
+ Util::Debug(s);
+ }
+
+ m_Socket->SendBytes((const unsigned char *)p, l);
+ m_Queue->pop();
+ delete d;
+ d = NULL;
+
+ } catch (TimeoutException te)
+ {
+ //Util::Debug("[IPCommWriterThread]timeout exception");
+ }
+ }
+}
+
+void IPCommWriterThread::Stop()
+{
+ m_Running = false;
+}
+
+bool IPCommWriterThread::IsRunning()
+{
+ return m_Running;
+}
+
+
+//**********************************************************************************
+// Class IPCommMonitorThread
+//
+// This thread creates and starts reader and writer threads
+// The thread also monitors if reader and writer threads are running and restarts them in case either isn't running
+//**********************************************************************************
+
+IPCommMonitorThread::IPCommMonitorThread(SafeQueue<Data*>* TxQueue,
+ SafeQueue<Data*>* RxQueue,
+ int LocalPort,
+ string& RemoteHost,
+ int RemotePort,
+ long RecvBufferSize)
+ : m_Running(false),
+ m_TxQueue(TxQueue),
+ m_RxQueue(RxQueue),
+ m_ReaderThread(NULL),
+ m_WriterThread(NULL),
+ m_LocalPort(LocalPort),
+ m_RemoteHost(RemoteHost),
+ m_RemotePort(RemotePort),
+ m_RecvBufferSize(RecvBufferSize)
+{
+}
+
+IPCommMonitorThread::~IPCommMonitorThread()
+{
+ if(m_ReaderThread)
+ {
+ delete m_ReaderThread;
+ m_ReaderThread = NULL;
+ }
+ if(m_WriterThread)
+ {
+ delete m_WriterThread;
+ m_WriterThread = NULL;
+ }
+}
+
+/*
+ * This method has two functionalities
+ * -It waits for incoming connections if local port is defined
+ * -It tries to connect to remote host if local host is not defined
+ */
+void IPCommMonitorThread::Connect(Socket*& s)
+{
+ // This trickery here is because if there are no sockets (Socket::nofSockets_)
+ // WSACleanup gets called and then SOAP gets messed up.
+ // And creating a new socket for a new connection seems to work better
+ // than using the old when re-connecting / re-listening.
+ Socket* new_s = new Socket();
+ delete s;
+ s = new_s;
+
+ // If local port is defined start open listening socket
+ SocketServer ss;
+ if( m_LocalPort )
+ {
+ Util::Info("[IPComm] Listen for incoming connection...");
+ String remoteHost( "[IPComm] Connected! Remote host : " );
+ ss.Accept( s, m_LocalPort, 1, remoteHost );
+ Util::Info(remoteHost);
+ }
+ // If not start connecting
+ else
+ {
+ Util::Info("[IPComm] Connecting...");
+ ss.Connect( s, m_RemoteHost.c_str(), m_RemotePort );
+ Util::Info("[IPComm] Connected!");
+ }
+}
+
+/*
+ * Main execution loop of thread
+ * -Creates reader and writer threads and starts them
+ * -Monitors if either reader or writer thread aren't running and restarts them if not
+ */
+void IPCommMonitorThread::Run()
+{
+ Socket* s = NULL;
+
+ m_ReaderThread = new IPCommReaderThread( m_RxQueue, m_RecvBufferSize );
+ m_WriterThread = new IPCommWriterThread( m_TxQueue ) ;
+
+ m_Running = true;
+ while (m_Running)
+ {
+ // Reader thread should stop running when connection is lost
+ if( !m_ReaderThread->IsRunning() || !m_WriterThread->IsRunning() )
+ {
+ Util::Info( "[IPComm] Disconnected!" );
+
+ // Stop the treads
+ m_ReaderThread->Stop();
+ WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
+ m_WriterThread->Stop();
+ WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
+
+ // Try to connect again.
+ BOOL connected = false;
+ while( m_Running && connected == false)
+ {
+ try{
+ Connect(s);
+ connected = true;
+ m_ReaderThread->m_Socket = s;
+ m_WriterThread->m_Socket = s;
+
+ // Start threads
+ m_ReaderThread->Start();
+ m_WriterThread->Start();
+ SetThreadPriority( m_ReaderThread->ThreadHandle(), THREAD_PRIORITY_LOWEST);
+ int priority = GetThreadPriority(m_ReaderThread->ThreadHandle());
+ int i = 0;
+ }
+ catch( char* ){
+ Sleep(1000);
+ }
+ }
+ }
+ Sleep(1000);
+ }
+
+ // Stop the treads
+ m_ReaderThread->Stop();
+ WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
+ m_WriterThread->Stop();
+ WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
+ // and close the current socket
+ if( s )
+ delete s;
+}
+
+void IPCommMonitorThread::Stop()
+{
+ m_Running = false;
+ m_ReaderThread->Stop();
+ m_WriterThread->Stop();
+}
+
+bool IPCommMonitorThread::IsRunning()
+{
+ return m_Running;
+}
+
+// End of the file
\ No newline at end of file