hti/PC_Tools/HTIGateway/HtiGateway/src/IPComm.cpp
branchRCL_3
changeset 59 8ad140f3dd41
parent 0 a03f92240627
equal deleted inserted replaced
49:7fdc9a71d314 59:8ad140f3dd41
       
     1 /*
       
     2 * Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). 
       
     3 * All rights reserved.
       
     4 * This component and the accompanying materials are made available
       
     5 * under the terms of "Eclipse Public License v1.0"
       
     6 * which accompanies this distribution, and is available
       
     7 * at the URL "http://www.eclipse.org/legal/epl-v10.html".
       
     8 *
       
     9 * Initial Contributors:
       
    10 * Nokia Corporation - initial contribution.
       
    11 *
       
    12 * Contributors:
       
    13 * 
       
    14 * Description:
       
    15 *   This file contains the header file of the IPCommPlugin,
       
    16 *	IPCommReaderThread, IPCommWriterThread and IPCommMonitorThread classes.
       
    17 */
       
    18 
       
    19 // INCLUDES
       
    20 #include "socket.h"
       
    21 #include "IPCommPlugin.h"
       
    22 #include "util.h"
       
    23 
       
    24 
       
    25 const static int g_IPMaxResendNumber = 2;
       
    26 
       
    27 //**********************************************************************************
       
    28 // Class IPCommPlugin
       
    29 //
       
    30 // This class implements a CommChannelPlugin which is used to communicate with device using TCP/IP
       
    31 //**********************************************************************************
       
    32 
       
    33 IPCommPlugin::IPCommPlugin(const CommChannelPluginObserver* observer)
       
    34     : CommChannelPlugin(observer),
       
    35       m_TxQueue(),
       
    36       m_RxQueue(),
       
    37 	  m_PropertyLocalPort(0),
       
    38 	  m_PropertyRemotePort(0)
       
    39 {
       
    40     m_MonitorThread = NULL;
       
    41 }
       
    42 
       
    43 IPCommPlugin::~IPCommPlugin()
       
    44 {
       
    45     Util::Debug("IPCommPlugin::~IPCommPlugin()");
       
    46     if (m_Open)
       
    47     {
       
    48         Close();
       
    49     }
       
    50     if (m_MonitorThread != NULL)
       
    51     {
       
    52         delete m_MonitorThread;
       
    53         m_MonitorThread = NULL;
       
    54     }
       
    55 }
       
    56 
       
    57 /*
       
    58  * This method initializes IPCommPlugin and Starts IPCommMonitorThread
       
    59  */
       
    60 DWORD IPCommPlugin::Init()
       
    61 {
       
    62     Util::Debug("IPCommPlugin::Init()");
       
    63 
       
    64     std::string filename = IP_INI_FILE_NAME;
       
    65 	map<string, string> IPCommPluginProperties;
       
    66 	Util::ReadProperties(filename.c_str(), IPCommPluginProperties);
       
    67 
       
    68 	CheckProperties(IPCommPluginProperties);
       
    69 
       
    70     CommChannelPlugin::Init();
       
    71 
       
    72 	m_MonitorThread = new IPCommMonitorThread(&m_TxQueue,
       
    73 		                                      &m_RxQueue,
       
    74 											  m_PropertyLocalPort,
       
    75 											  m_PropertyRemoteHost,
       
    76 											  m_PropertyRemotePort,
       
    77 											  m_PropertyRecvBufferSize);
       
    78 
       
    79 	m_MonitorThread->Start();
       
    80 
       
    81     Util::Debug("IPCommPlugin::Init() IPComm opened");
       
    82     m_Open = true;
       
    83     Util::Debug("IPCommPlugin::Init() OK");
       
    84     return NO_ERRORS;
       
    85 }
       
    86 
       
    87 /*
       
    88  * This method initializes class member variables from values in map
       
    89  */
       
    90 void IPCommPlugin::CheckProperties(map<string, string>& props)
       
    91 {
       
    92     char tmp[256];
       
    93 
       
    94     // Local port
       
    95     string val = props[IP_INI_LOCAL_PORT_PARAM];
       
    96 	Util::CheckCommandlineParam( PARAM_SWITCH_LOCAL_PORT_PARAM, val );
       
    97     if (!val.empty())
       
    98     {
       
    99         m_PropertyLocalPort = atol(val.c_str());
       
   100     }
       
   101 
       
   102     // Receive TCP/IP buffer size
       
   103     val = props[IP_INI_RECV_BUFFER_SIZE_PARAM];
       
   104 	Util::CheckCommandlineParam( PARAM_SWITCH_RECV_BUFFER_SIZE_PARAM, val );
       
   105     if (!val.empty())
       
   106     {
       
   107         m_PropertyRecvBufferSize = atol(val.c_str());
       
   108     }
       
   109 	else
       
   110 	{
       
   111 		// Use 8*1024 bytes (8KB) as default value
       
   112 		m_PropertyRecvBufferSize = 8*1024; 
       
   113 	}
       
   114 
       
   115 	if( m_PropertyLocalPort )
       
   116 	{
       
   117 		sprintf(tmp, "[IPComm] Local port : %d", m_PropertyLocalPort );
       
   118 		string s(tmp);
       
   119 		Util::Info(s);
       
   120 	}
       
   121 	else
       
   122 	{
       
   123 		// Remote host
       
   124 		m_PropertyRemoteHost = props[IP_INI_REMOTE_HOST_PARAM];
       
   125 		//Check and replace if -REMOTE_HOST was given as command line parameter
       
   126 		Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_HOST_PARAM, m_PropertyRemoteHost );
       
   127 		if(m_PropertyRemoteHost.empty())
       
   128 		{
       
   129 			throw "No remote host specified!";
       
   130 		}
       
   131 		sprintf(tmp, "[IPComm] Remote host : '%s'", m_PropertyRemoteHost.c_str());
       
   132 		string s = tmp;
       
   133 		Util::Info(s);
       
   134 
       
   135 		// Remote port
       
   136 		val = props[IP_INI_REMOTE_PORT_PARAM];
       
   137 		//Check and replace if -REMOTE_POST was given as command line parameter		
       
   138 		Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_PORT_PARAM, val );
       
   139 		if (!val.empty())
       
   140 		{
       
   141 			m_PropertyRemotePort = atol(val.c_str());
       
   142 		}
       
   143 		if( m_PropertyRemotePort == 0)
       
   144 			throw "Invalid remote port specified!";
       
   145 
       
   146 		sprintf(tmp, "[IPComm] Remote port : %d", m_PropertyRemotePort );
       
   147 		s = tmp;
       
   148 		Util::Info(s);
       
   149 	}
       
   150 }
       
   151 
       
   152 /*
       
   153  * This method checks if data is available on incoming queue
       
   154  */
       
   155 bool IPCommPlugin::IsDataAvailable()
       
   156 {
       
   157     return (!m_RxQueue.empty());
       
   158 }
       
   159 
       
   160 /*
       
   161  * This method is used to push given data to outgoing queue and then 
       
   162  * wait for data to become available and read all data into single Data object 
       
   163  */
       
   164 DWORD IPCommPlugin::SendReceive(Data* data_in, Data** data_out, long timeout)
       
   165 {
       
   166     DWORD res;
       
   167     if ((res = Send(data_in, timeout)) == NO_ERRORS &&
       
   168         (res = ReceiveWait(data_out, timeout)) == NO_ERRORS)
       
   169     {
       
   170         return NO_ERRORS;
       
   171     }
       
   172     cout << "IPCommPlugin::SendReceive: error" << endl;
       
   173     return res;
       
   174 }
       
   175 
       
   176 /*
       
   177  * This method pushes the given Data object(of type Data::EData) to outgoing queue
       
   178  */
       
   179 DWORD IPCommPlugin::Send(Data* data_in, long timeout)
       
   180 {
       
   181     Data::DataType type = data_in->GetType();
       
   182     if (type == Data::EData)
       
   183     {
       
   184         DWORD length = data_in->GetLength();
       
   185         m_TxQueue.push(data_in);
       
   186         return NO_ERRORS;
       
   187     }
       
   188     else if (type == Data::EControl)
       
   189     {
       
   190         Util::Debug("IPCommPlugin::Send: Control Message");
       
   191         return NO_ERRORS;
       
   192     }
       
   193     return ERR_DG_COMM_DATA_SEND;
       
   194 }
       
   195 
       
   196 /*
       
   197  * This method is used to wait for data to become available in incoming queue 
       
   198  * and then read all data into single Data object which is given as parameter
       
   199  */
       
   200 DWORD IPCommPlugin::ReceiveWait(Data** data_out, long timeout)
       
   201 {
       
   202     long elapsed = 0;
       
   203     while (elapsed < timeout && !IsDataAvailable())
       
   204     {
       
   205         elapsed += 25;
       
   206         Sleep(25);
       
   207     }
       
   208     if (elapsed >= timeout)
       
   209     {
       
   210         return ERR_DG_COMM_DATA_RECV_TIMEOUT;
       
   211     }
       
   212     return Receive(data_out, timeout);
       
   213 }
       
   214 
       
   215 /*
       
   216  * This method is used to read all data in incoming queue to single Data object and store the result
       
   217  * to the data object given parameter
       
   218  */
       
   219 DWORD IPCommPlugin::Receive(Data** data_out, long timeout)
       
   220 {
       
   221     if (!m_RxQueue.empty())
       
   222     {
       
   223 		*data_out = m_RxQueue.front();
       
   224         m_RxQueue.pop();
       
   225         return NO_ERRORS;
       
   226     }
       
   227     return ERR_DG_COMM_DATA_RECV;
       
   228 }
       
   229 
       
   230 
       
   231 DWORD IPCommPlugin::Open()
       
   232 {
       
   233     return (m_Open ? NO_ERRORS : ERR_DG_COMM_OPEN);
       
   234 }
       
   235 
       
   236 DWORD IPCommPlugin::Close()
       
   237 {
       
   238     m_MonitorThread->Stop();
       
   239     WaitForSingleObject(m_MonitorThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   240     return NO_ERRORS;
       
   241 }
       
   242 
       
   243 
       
   244 //**********************************************************************************
       
   245 // Class IPCommReaderThread
       
   246 //
       
   247 // This thread is used to read bytes from TCP/IP socket, encapsulate the bytes to Data objects 
       
   248 // and push them to incoming queue 
       
   249 //**********************************************************************************
       
   250 
       
   251 IPCommReaderThread::IPCommReaderThread(SafeQueue<Data*>* q,
       
   252                                        long bufsize)
       
   253 	:m_Running(false),
       
   254     m_Socket(NULL)
       
   255 {
       
   256 	m_Queue = q;
       
   257 	m_TcpIpBufferSize = bufsize;
       
   258 }
       
   259 
       
   260 /*
       
   261  * Main execution loop which reads bytes from socket, encapsulates the bytes to Data object and pushes them to incoming queue
       
   262  */	
       
   263 void IPCommReaderThread::Run()
       
   264 {
       
   265 	if( m_Socket )
       
   266 		m_Running = true;
       
   267 
       
   268 	BYTE* buffer = new BYTE[m_TcpIpBufferSize];
       
   269 	while (m_Running)
       
   270 	{
       
   271 		// Reading from TCP/IP port
       
   272 		//Util::Debug("[IPCommReaderThread] try to read");
       
   273 		int bytes_read = -1;
       
   274 		bytes_read = m_Socket->ReceiveBytes(buffer, m_TcpIpBufferSize);
       
   275 		if (bytes_read < 0)
       
   276 		{
       
   277 			Stop();
       
   278 			break;
       
   279 		}
       
   280 		if (bytes_read > 0)
       
   281 		{
       
   282 			Data* d = new Data((void *)buffer, bytes_read, Data::EData);
       
   283 			if (Util::GetVerboseLevel() == Util::VerboseLevel::debug)
       
   284 			{
       
   285 				char tmp[64];
       
   286 				sprintf(tmp, "m_Socket->ReceiveBytes (%d (dec) bytes):", d->GetLength());
       
   287 				string s(tmp);
       
   288 				Util::Debug(s);
       
   289 			}
       
   290 			m_Queue->push(d);
       
   291 			d = NULL;
       
   292 		}
       
   293 		Sleep(0);
       
   294 	}
       
   295 	delete[] buffer;
       
   296 	buffer = NULL;
       
   297 }
       
   298 
       
   299 void IPCommReaderThread::Stop()
       
   300 {
       
   301 	m_Running = false;
       
   302 }
       
   303 
       
   304 bool IPCommReaderThread::IsRunning()
       
   305 {
       
   306 	return m_Running;
       
   307 }
       
   308 
       
   309 
       
   310 
       
   311 //**********************************************************************************
       
   312 // Class DataGatewaySocketWriterThread
       
   313 //
       
   314 // This thread is used to write data from outgoing queue to TCP/IP socket
       
   315 //**********************************************************************************
       
   316 
       
   317 IPCommWriterThread::IPCommWriterThread(SafeQueue<Data*>* q)
       
   318 	:m_Running(false),
       
   319 	m_Socket(NULL)
       
   320 {
       
   321 	m_Queue = q;
       
   322 }
       
   323 
       
   324 /*
       
   325  * This method contains the main execution loop which gets Data from outgoing queue and sends it to socket
       
   326  */
       
   327 void IPCommWriterThread::Run()
       
   328 {
       
   329 	if( m_Socket )
       
   330 		m_Running = true;
       
   331 
       
   332 	while (m_Running)
       
   333 	{
       
   334 		// Sending to TCP/IP port
       
   335 		//Util::Debug("[IPCommWriterThread] try to send");
       
   336 		try
       
   337 		{
       
   338 			Data* d = m_Queue->front(50);
       
   339 			char* p = (char *)d->GetData();
       
   340 			DWORD l = d->GetLength();
       
   341 
       
   342 			if (Util::GetVerboseLevel() == Util::VerboseLevel::debug)
       
   343 			{
       
   344 				char tmp[64];
       
   345 				sprintf(tmp, "[IPCommWriterThread] HTI MsgSize = %d", l);
       
   346 				string s(tmp);
       
   347 				Util::Debug(s);
       
   348 			}
       
   349 
       
   350 			m_Socket->SendBytes((const unsigned char *)p, l);
       
   351 			m_Queue->pop();
       
   352 			delete d;
       
   353 			d = NULL;
       
   354 
       
   355 		} catch (TimeoutException te)
       
   356 		{
       
   357 		//Util::Debug("[IPCommWriterThread]timeout exception");
       
   358 		}
       
   359 	}
       
   360 }
       
   361 
       
   362 void IPCommWriterThread::Stop()
       
   363 {
       
   364 	m_Running = false;
       
   365 }
       
   366 
       
   367 bool IPCommWriterThread::IsRunning()
       
   368 {
       
   369 	return m_Running;
       
   370 }
       
   371 
       
   372 
       
   373 //**********************************************************************************
       
   374 // Class IPCommMonitorThread
       
   375 //
       
   376 // This thread creates and starts reader and writer threads
       
   377 // The thread also monitors if reader and writer threads are running and restarts them in case either isn't running
       
   378 //**********************************************************************************
       
   379 
       
   380 IPCommMonitorThread::IPCommMonitorThread(SafeQueue<Data*>* TxQueue,
       
   381 										 SafeQueue<Data*>* RxQueue,
       
   382 										 int LocalPort,
       
   383                                          string& RemoteHost,
       
   384                                          int RemotePort,
       
   385 										 long RecvBufferSize)
       
   386 	: m_Running(false),
       
   387 	m_TxQueue(TxQueue),
       
   388 	m_RxQueue(RxQueue),
       
   389 	m_ReaderThread(NULL),
       
   390 	m_WriterThread(NULL),
       
   391 	m_LocalPort(LocalPort),
       
   392     m_RemoteHost(RemoteHost),
       
   393 	m_RemotePort(RemotePort),
       
   394 	m_RecvBufferSize(RecvBufferSize)
       
   395 {
       
   396 }
       
   397 
       
   398 IPCommMonitorThread::~IPCommMonitorThread()
       
   399 {
       
   400 	if(m_ReaderThread)
       
   401 	{
       
   402 		delete m_ReaderThread;
       
   403 		m_ReaderThread = NULL;
       
   404 	}
       
   405 	if(m_WriterThread)
       
   406 	{
       
   407 		delete m_WriterThread;
       
   408 		m_WriterThread = NULL;
       
   409 	}
       
   410 }
       
   411 
       
   412 /*
       
   413  * This method has two functionalities
       
   414  * -It waits for incoming connections if local port is defined
       
   415  * -It tries to connect to remote host if local host is not defined
       
   416  */
       
   417 void IPCommMonitorThread::Connect(Socket*& s)
       
   418 {
       
   419 	// This trickery here is because if there are no sockets (Socket::nofSockets_)
       
   420 	// WSACleanup gets called and then SOAP gets messed up.
       
   421 	// And creating a new socket for a new connection seems to work better
       
   422 	// than using the old when re-connecting / re-listening.
       
   423 	Socket* new_s = new Socket();
       
   424 	delete s;
       
   425 	s = new_s;
       
   426 
       
   427 	// If local port is defined start open listening socket
       
   428 	SocketServer ss;
       
   429 	if( m_LocalPort )
       
   430 	{
       
   431 		Util::Info("[IPComm] Listen for incoming connection...");
       
   432 		String remoteHost( "[IPComm] Connected! Remote host : " );
       
   433 		ss.Accept( s, m_LocalPort, 1, remoteHost );
       
   434 		Util::Info(remoteHost);
       
   435 	}
       
   436 	// If not start connecting
       
   437 	else
       
   438 	{
       
   439 		Util::Info("[IPComm] Connecting...");
       
   440 		ss.Connect( s, m_RemoteHost.c_str(), m_RemotePort );
       
   441 		Util::Info("[IPComm] Connected!");
       
   442 	}
       
   443 }
       
   444 
       
   445 /*
       
   446  * Main execution loop of thread
       
   447  * -Creates reader and writer threads and starts them
       
   448  * -Monitors if either reader or writer thread aren't running and restarts them if not
       
   449  */
       
   450 void IPCommMonitorThread::Run()
       
   451 {
       
   452 	Socket* s = NULL;
       
   453 
       
   454 	m_ReaderThread = new IPCommReaderThread( m_RxQueue, m_RecvBufferSize );
       
   455 	m_WriterThread = new IPCommWriterThread( m_TxQueue ) ;
       
   456 
       
   457 	m_Running = true;
       
   458 	while (m_Running)
       
   459 	{
       
   460 		// Reader thread should stop running when connection is lost
       
   461 		if( !m_ReaderThread->IsRunning() || !m_WriterThread->IsRunning() )
       
   462 		{
       
   463 			Util::Info( "[IPComm] Disconnected!" );
       
   464 
       
   465 			// Stop the treads
       
   466 			m_ReaderThread->Stop();
       
   467 		    WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   468 			m_WriterThread->Stop();
       
   469 			WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   470 
       
   471 			// Try to connect again.
       
   472 			BOOL connected = false;
       
   473 			while( m_Running && connected == false)
       
   474 			{
       
   475 				try{
       
   476 					Connect(s);
       
   477 					connected = true;
       
   478 					m_ReaderThread->m_Socket = s;
       
   479 					m_WriterThread->m_Socket = s;
       
   480 
       
   481 					// Start threads
       
   482 					m_ReaderThread->Start();
       
   483 					m_WriterThread->Start();
       
   484 					SetThreadPriority( m_ReaderThread->ThreadHandle(), THREAD_PRIORITY_LOWEST);
       
   485 					int priority = GetThreadPriority(m_ReaderThread->ThreadHandle());
       
   486 					int i = 0;
       
   487 				}
       
   488 				catch( char* ){
       
   489 					Sleep(1000);
       
   490 				}
       
   491 			}
       
   492 		}
       
   493 		Sleep(1000);
       
   494 	}
       
   495 
       
   496 	// Stop the treads
       
   497 	m_ReaderThread->Stop();
       
   498 	WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   499 	m_WriterThread->Stop();
       
   500 	WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   501 	// and close the current socket
       
   502 	if( s )
       
   503 		delete s;
       
   504 }
       
   505 
       
   506 void IPCommMonitorThread::Stop()
       
   507 {
       
   508 	m_Running = false;
       
   509 	m_ReaderThread->Stop();
       
   510 	m_WriterThread->Stop();
       
   511 }
       
   512 
       
   513 bool IPCommMonitorThread::IsRunning()
       
   514 {
       
   515 	return m_Running;
       
   516 }
       
   517 
       
   518 // End of the file