hti/PC_Tools/DataGateway/SRC/IPComm.cpp
branchRCL_3
changeset 59 8ad140f3dd41
parent 0 a03f92240627
equal deleted inserted replaced
49:7fdc9a71d314 59:8ad140f3dd41
       
     1 /*
       
     2 * Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). 
       
     3 * All rights reserved.
       
     4 * This component and the accompanying materials are made available
       
     5 * under the terms of "Eclipse Public License v1.0"
       
     6 * which accompanies this distribution, and is available
       
     7 * at the URL "http://www.eclipse.org/legal/epl-v10.html".
       
     8 *
       
     9 * Initial Contributors:
       
    10 * Nokia Corporation - initial contribution.
       
    11 *
       
    12 * Contributors:
       
    13 * 
       
    14 * Description:
       
    15 *   This file contains the header file of the IPCommPlugin,
       
    16 *	IPCommReaderThread, IPCommWriterThread and IPCommMonitorThread classes.
       
    17 */
       
    18 
       
    19 // INCLUDES
       
    20 #include "socket.h"
       
    21 #include "IPCommPlugin.h"
       
    22 
       
    23 const static int g_IPMaxResendNumber = 2;
       
    24 
       
    25 //**********************************************************************************
       
    26 // Class IPCommPlugin
       
    27 //
       
    28 // This class implements a CommChannelPlugin which is used to communicate with device using TCP/IP
       
    29 //**********************************************************************************
       
    30 
       
    31 IPCommPlugin::IPCommPlugin(const CommChannelPluginObserver* observer)
       
    32     : CommChannelPlugin(observer),
       
    33       m_TxQueue(),
       
    34       m_RxQueue(),
       
    35 	  m_PropertyLocalPort(0),
       
    36 	  m_PropertyRemotePort(0)
       
    37 {
       
    38     m_MonitorThread = NULL;
       
    39 }
       
    40 
       
    41 IPCommPlugin::~IPCommPlugin()
       
    42 {
       
    43     Util::Debug("IPCommPlugin::~IPCommPlugin()");
       
    44     if (m_Open)
       
    45     {
       
    46         Close();
       
    47     }
       
    48     if (m_MonitorThread != NULL)
       
    49     {
       
    50         delete m_MonitorThread;
       
    51         m_MonitorThread = NULL;
       
    52     }
       
    53 }
       
    54 
       
    55 /*
       
    56  * This method initializes IPCommPlugin and Starts IPCommMonitorThread
       
    57  */
       
    58 DWORD IPCommPlugin::Init()
       
    59 {
       
    60     Util::Debug("IPCommPlugin::Init()");
       
    61 	
       
    62     std::string filename = IP_INI_FILE_NAME;
       
    63 	map<string, string> IPCommPluginProperties;
       
    64 	Util::ReadProperties(filename.c_str(), IPCommPluginProperties);
       
    65 
       
    66 	CheckProperties(IPCommPluginProperties);
       
    67 
       
    68     CommChannelPlugin::Init();
       
    69 
       
    70 	m_MonitorThread = new IPCommMonitorThread(
       
    71 		&m_TxQueue, &m_RxQueue, m_PropertyLocalPort, m_PropertyRemoteHost, m_PropertyRemotePort);
       
    72 
       
    73 	m_MonitorThread->Start();
       
    74 
       
    75     Util::Debug("IPCommPlugin::Init() IPComm opened");
       
    76     m_Open = true;
       
    77     Util::Debug("IPCommPlugin::Init() OK");
       
    78     return NO_ERRORS;
       
    79 }
       
    80 
       
    81 /*
       
    82  * This method initializes class member variables from values in map
       
    83  */
       
    84 void IPCommPlugin::CheckProperties(map<string, string>& props)
       
    85 {
       
    86     char tmp[256];
       
    87 
       
    88     //Local port
       
    89     string val = props[IP_INI_LOCAL_PORT_PARAM];
       
    90 	Util::CheckCommandlineParam( PARAM_SWITCH_LOCAL_PORT_PARAM, val );
       
    91     if (!val.empty())
       
    92     {
       
    93         m_PropertyLocalPort = atol(val.c_str());
       
    94     }
       
    95 
       
    96 	if( m_PropertyLocalPort )
       
    97 	{
       
    98 		sprintf(tmp, "[IPComm] Local port : %d", m_PropertyLocalPort );
       
    99 		string s(tmp);
       
   100 		Util::Info(s);
       
   101 	}
       
   102 	else
       
   103 	{
       
   104 		//Remote host
       
   105 		m_PropertyRemoteHost = props[IP_INI_REMOTE_HOST_PARAM];
       
   106 		//Check and replace if -REMOTE_HOST was given as command line parameter
       
   107 		Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_HOST_PARAM, m_PropertyRemoteHost );
       
   108 		if(m_PropertyRemoteHost.empty())
       
   109 		{
       
   110 			throw "No remote host specified!";
       
   111 		}
       
   112 		sprintf(tmp, "[IPComm] Remote host : '%s'", m_PropertyRemoteHost.c_str());
       
   113 		string s = tmp;
       
   114 		Util::Info(s);
       
   115 
       
   116 		//Remote port
       
   117 		val = props[IP_INI_REMOTE_PORT_PARAM];
       
   118 		//Check and replace if -REMOTE_POST was given as command line parameter
       
   119 		Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_PORT_PARAM, val );
       
   120 		if (!val.empty())
       
   121 		{
       
   122 			m_PropertyRemotePort = atol(val.c_str());
       
   123 		}
       
   124 		if( m_PropertyRemotePort == 0)
       
   125 			throw "Invalid remote port specified!";
       
   126 
       
   127 		sprintf(tmp, "[IPComm] Remote port : %d", m_PropertyRemotePort );
       
   128 		s = tmp;
       
   129 		Util::Info(s);
       
   130 	}
       
   131 }
       
   132 
       
   133 /*
       
   134  * This method checks if data is available on incoming queue
       
   135  */
       
   136 bool IPCommPlugin::IsDataAvailable()
       
   137 {
       
   138     return (!m_RxQueue.empty());
       
   139 }
       
   140 
       
   141 /*
       
   142  * This method is used to push given data to outgoing queue and then 
       
   143  * wait for data to become available and read all data into single Data object 
       
   144  */
       
   145 DWORD IPCommPlugin::SendReceive(Data* data_in, Data** data_out, long timeout)
       
   146 {
       
   147     DWORD res;
       
   148     if ((res = Send(data_in, timeout)) == NO_ERRORS &&
       
   149         (res = ReceiveWait(data_out, timeout)) == NO_ERRORS)
       
   150     {
       
   151         return NO_ERRORS;
       
   152     }
       
   153     cout << "IPCommPlugin::SendReceive: error" << endl;
       
   154     return res;
       
   155 }
       
   156 
       
   157 /*
       
   158  * This method pushes the given Data object(of type Data::EData) to outgoing queue
       
   159  */
       
   160 DWORD IPCommPlugin::Send(Data* data_in, long timeout)
       
   161 {
       
   162     Data::DataType type = data_in->GetType();
       
   163     if (type == Data::EData)
       
   164     {
       
   165         DWORD length = data_in->GetLength();
       
   166         m_TxQueue.push(data_in);
       
   167         return NO_ERRORS;
       
   168     }
       
   169     else if (type == Data::EControl)
       
   170     {
       
   171         Util::Debug("IPCommPlugin::Send: Control Message");
       
   172         return NO_ERRORS;
       
   173     }
       
   174     return ERR_DG_COMM_DATA_SEND;
       
   175 }
       
   176 
       
   177 
       
   178 /*
       
   179  * This method is used to wait for data to become available in incoming queue 
       
   180  * and then read all data into single Data object which is given as parameter
       
   181  */
       
   182 DWORD IPCommPlugin::ReceiveWait(Data** data_out, long timeout)
       
   183 {
       
   184     long elapsed = 0;
       
   185     while (elapsed < timeout && !IsDataAvailable())
       
   186     {
       
   187         elapsed += 25;
       
   188         Sleep(25);
       
   189     }
       
   190     if (elapsed >= timeout)
       
   191     {
       
   192         return ERR_DG_COMM_DATA_RECV_TIMEOUT;
       
   193     }
       
   194     return Receive(data_out, timeout);
       
   195 }
       
   196 
       
   197 /*
       
   198  * This method is used to read all data in incoming queue to single Data object and store the result
       
   199  * to the data object given parameter
       
   200  */
       
   201 DWORD IPCommPlugin::Receive(Data** data_out, long timeout)
       
   202 {
       
   203     if (!m_RxQueue.empty())
       
   204     {
       
   205         *data_out = m_RxQueue.front();
       
   206         m_RxQueue.pop();
       
   207 	return NO_ERRORS;
       
   208     }
       
   209     return ERR_DG_COMM_DATA_RECV;
       
   210 }
       
   211 
       
   212 
       
   213 DWORD IPCommPlugin::Open()
       
   214 {
       
   215     return (m_Open ? NO_ERRORS : ERR_DG_COMM_OPEN);
       
   216 }
       
   217 
       
   218 DWORD IPCommPlugin::Close()
       
   219 {
       
   220     m_MonitorThread->Stop();
       
   221     WaitForSingleObject(m_MonitorThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   222     return NO_ERRORS;
       
   223 }
       
   224 
       
   225 
       
   226 //**********************************************************************************
       
   227 // Class IPCommReaderThread
       
   228 //
       
   229 // This thread is used to read bytes from TCP/IP socket, encapsulate the bytes to Data objects 
       
   230 // and push them to incoming queue 
       
   231 //**********************************************************************************
       
   232 
       
   233 IPCommReaderThread::IPCommReaderThread(SafeQueue<Data*>* q,
       
   234                                        long bufsize)
       
   235 	:m_Running(false),
       
   236     m_Socket(NULL)
       
   237 {
       
   238 	m_Queue = q;
       
   239 	if (bufsize > 0)
       
   240 	{
       
   241 		m_TcpIpBufferSize = bufsize;
       
   242 	}
       
   243 	else
       
   244 	{
       
   245 		m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize;
       
   246 	}
       
   247 }
       
   248 
       
   249 /*
       
   250  * Main execution loop which reads bytes from socket, encapsulates the bytes to Data object and pushes them to incoming queue
       
   251  */	
       
   252 void IPCommReaderThread::Run()
       
   253 {
       
   254 	if( m_Socket )
       
   255 		m_Running = true;
       
   256 
       
   257 	BYTE* buffer = new BYTE[m_TcpIpBufferSize];
       
   258 	while (m_Running)
       
   259 	{
       
   260 		//Reading from TCP/IP port
       
   261 		//Util::Debug("[IPCommReaderThread] try to read");
       
   262 		int bytes_read = -1;
       
   263 		bytes_read = m_Socket->ReceiveBytes(buffer, m_TcpIpBufferSize);
       
   264 		if (bytes_read < 0)
       
   265 		{
       
   266 			Stop();
       
   267 			break;
       
   268 		}
       
   269 		if (bytes_read > 0) 
       
   270 		{
       
   271 			Data* d = new Data((void *)buffer, bytes_read, Data::EData);
       
   272 			if (Util::GetVerboseLevel() == Util::debug)
       
   273 			{
       
   274 				char tmp[64];
       
   275 				sprintf(tmp, "m_Socket->ReceiveBytes (%d (dec) bytes):", d->GetLength());
       
   276 				string s(tmp);
       
   277 				Util::Debug(s);
       
   278 			}
       
   279 			m_Queue->push(d);
       
   280 			d = NULL;
       
   281 		}
       
   282 		Sleep(50);
       
   283 	}
       
   284 	delete[] buffer;
       
   285 	buffer = NULL;
       
   286 }
       
   287 
       
   288 void IPCommReaderThread::Stop()
       
   289 {
       
   290 	m_Running = false;
       
   291 }
       
   292 
       
   293 bool IPCommReaderThread::IsRunning()
       
   294 {
       
   295 	return m_Running;
       
   296 }
       
   297 
       
   298 
       
   299 
       
   300 //**********************************************************************************
       
   301 // Class IPCommWriterThread
       
   302 //
       
   303 // This thread is used to write data from outgoing queue to TCP/IP socket
       
   304 //**********************************************************************************
       
   305 
       
   306 IPCommWriterThread::IPCommWriterThread(SafeQueue<Data*>* q)
       
   307 	:m_Running(false),
       
   308 	m_Socket(NULL)
       
   309 {
       
   310 	m_Queue = q;
       
   311 }
       
   312 
       
   313 /*
       
   314  * This method contains the main execution loop which gets Data from outgoing queue and sends it to socket
       
   315  */
       
   316 void IPCommWriterThread::Run()
       
   317 {
       
   318 	if( m_Socket )
       
   319 		m_Running = true;
       
   320 
       
   321 	while (m_Running)
       
   322 	{
       
   323 		// Sending to TCP/IP port
       
   324 		//Util::Debug("[IPCommWriterThread] try to send");
       
   325 		try
       
   326 		{
       
   327 			Data* d = m_Queue->front(50);
       
   328 			char* p = (char *)d->GetData();
       
   329 			DWORD l = d->GetLength();
       
   330 
       
   331             if (Util::GetVerboseLevel() == Util::debug)
       
   332 			{
       
   333 				char tmp[64];
       
   334 				sprintf(tmp, "[IPCommWriterThread] HTI MsgSize = %d", l);
       
   335 				string s(tmp);
       
   336 				Util::Debug(s);
       
   337 			}
       
   338 
       
   339 			m_Socket->SendBytes((const unsigned char *)p, l);
       
   340 			m_Queue->pop();
       
   341 			delete d;
       
   342 			d = NULL;
       
   343 
       
   344 		} catch (TimeoutException te)
       
   345 		{
       
   346 		//Util::Debug("[IPCommWriterThread]timeout exception");
       
   347 		}
       
   348 	}
       
   349 }
       
   350 
       
   351 void IPCommWriterThread::Stop()
       
   352 {
       
   353 	m_Running = false;
       
   354 }
       
   355 
       
   356 bool IPCommWriterThread::IsRunning()
       
   357 {
       
   358 	return m_Running;
       
   359 }
       
   360 
       
   361 
       
   362 //**********************************************************************************
       
   363 // Class IPCommMonitorThread
       
   364 //
       
   365 // This thread creates and starts reader and writer threads
       
   366 // The thread also monitors if reader and writer threads are running and restarts them in case either isn't running
       
   367 //**********************************************************************************
       
   368 
       
   369 IPCommMonitorThread::IPCommMonitorThread(SafeQueue<Data*>* TxQueue,
       
   370 										 SafeQueue<Data*>* RxQueue,
       
   371 										 int localPort,
       
   372                                          string& remoteHost,
       
   373                                          int remotePort)
       
   374 	: m_Running(false),
       
   375 	m_TxQueue(TxQueue),
       
   376 	m_RxQueue(RxQueue),
       
   377 	m_ReaderThread(NULL),
       
   378 	m_WriterThread(NULL),
       
   379 	m_LocalPort(localPort),
       
   380     m_RemoteHost(remoteHost),
       
   381 	m_RemotePort(remotePort)
       
   382 {
       
   383 }
       
   384 
       
   385 IPCommMonitorThread::~IPCommMonitorThread()
       
   386 {
       
   387 	if(m_ReaderThread)
       
   388 	{
       
   389 		delete m_ReaderThread;
       
   390 		m_ReaderThread = NULL;
       
   391 	}
       
   392 	if(m_WriterThread)
       
   393 	{
       
   394 		delete m_WriterThread;
       
   395 		m_WriterThread = NULL;
       
   396 	}
       
   397 }
       
   398 
       
   399 
       
   400 /*
       
   401  * This method has two functionalities
       
   402  * -It waits for incoming connections if local port is defined
       
   403  * -It tries to connect to remote host if local host is not defined
       
   404  */
       
   405 void IPCommMonitorThread::Connect(Socket*& s)
       
   406 {
       
   407 	// This trickery here is because if there are no sockets (Socket::nofSockets_)
       
   408 	// WSACleanup gets called and then SOAP gets messed up.
       
   409 	// And creating a new socket for a new connection seems to work better
       
   410 	// than using the old when re-connecting / re-listening.
       
   411 	Socket* new_s = new Socket();
       
   412 	delete s;
       
   413 	s = new_s;
       
   414 
       
   415 	// If local port is defined start open listening socket
       
   416 	SocketServer ss;
       
   417 	if( m_LocalPort )
       
   418 	{
       
   419 		Util::Info("[IPComm] Listen for incoming connection...");
       
   420 		String remoteHost( "[IPComm] Connected! Remote host : " );
       
   421 		ss.Accept( s, m_LocalPort, 1, remoteHost );
       
   422 		Util::Info(remoteHost);
       
   423 	}
       
   424 	// If not start connecting
       
   425 	else
       
   426 	{
       
   427 		Util::Info("[IPComm] Connecting...");
       
   428 		ss.Connect( s, m_RemoteHost.c_str(), m_RemotePort );
       
   429 		Util::Info("[IPComm] Connected!");
       
   430 	}
       
   431 }
       
   432 
       
   433 /*
       
   434  * Main execution loop of thread
       
   435  * -Creates reader and writer threads and starts them
       
   436  * -Monitors if either reader or writer thread aren't running and restarts them if not
       
   437  */
       
   438 void IPCommMonitorThread::Run()
       
   439 {
       
   440 	Socket* s = NULL;
       
   441 
       
   442 	m_ReaderThread = new IPCommReaderThread( m_RxQueue, g_DataGatewayDefaultTcpIpBufferSize );
       
   443 	m_WriterThread = new IPCommWriterThread( m_TxQueue ) ;
       
   444 
       
   445 	m_Running = true;
       
   446 	while (m_Running)
       
   447 	{
       
   448 		// Reader thread should stop running when connection is lost
       
   449 		if( !m_ReaderThread->IsRunning() || !m_WriterThread->IsRunning() )
       
   450 		{
       
   451 			Util::Info( "[IPComm] Disconnected!" );
       
   452 			
       
   453 			// Stop the treads
       
   454 			m_ReaderThread->Stop();
       
   455 		    WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   456 			m_WriterThread->Stop();
       
   457 			WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   458 			
       
   459 			// Try to connect again. 
       
   460 			BOOL connected = false;
       
   461 			while( m_Running && connected == false)
       
   462 			{
       
   463 				try{
       
   464 					Connect(s);
       
   465 					connected = true;
       
   466 					m_ReaderThread->m_Socket = s;
       
   467 					m_WriterThread->m_Socket = s;
       
   468 
       
   469 					// Start threads
       
   470 					m_ReaderThread->Start();
       
   471 					m_WriterThread->Start();
       
   472 				}
       
   473 				catch( char* ){
       
   474 					Sleep(1000);
       
   475 				}
       
   476 			}
       
   477 		}
       
   478 		Sleep(1000);
       
   479 	}
       
   480 
       
   481 	// Stop the treads
       
   482 	m_ReaderThread->Stop();
       
   483 	WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   484 	m_WriterThread->Stop();
       
   485 	WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
       
   486 	// and close the current socket
       
   487 	if( s )
       
   488 		delete s;
       
   489 }
       
   490 
       
   491 void IPCommMonitorThread::Stop()
       
   492 {
       
   493 	m_Running = false;
       
   494 	m_ReaderThread->Stop();
       
   495 	m_WriterThread->Stop();
       
   496 }
       
   497 
       
   498 bool IPCommMonitorThread::IsRunning()
       
   499 {
       
   500 	return m_Running;
       
   501 }
       
   502 
       
   503 // End of the file