changeset 0 a03f92240627
equal deleted inserted replaced
-1:000000000000 0:a03f92240627
     1 /*
     2 * Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). 
     3 * All rights reserved.
     4 * This component and the accompanying materials are made available
     5 * under the terms of "Eclipse Public License v1.0"
     6 * which accompanies this distribution, and is available
     7 * at the URL "".
     8 *
     9 * Initial Contributors:
    10 * Nokia Corporation - initial contribution.
    11 *
    12 * Contributors:
    13 * 
    14 * Description:
    15 *   This file contains the header file of the IPCommPlugin,
    16 *	IPCommReaderThread, IPCommWriterThread and IPCommMonitorThread classes.
    17 */
    19 // INCLUDES
    20 #include "socket.h"
    21 #include "IPCommPlugin.h"
    23 const static int g_IPMaxResendNumber = 2;
    25 //**********************************************************************************
    26 // Class IPCommPlugin
    27 //
    28 // This class implements a CommChannelPlugin which is used to communicate with device using TCP/IP
    29 //**********************************************************************************
    31 IPCommPlugin::IPCommPlugin(const CommChannelPluginObserver* observer)
    32     : CommChannelPlugin(observer),
    33       m_TxQueue(),
    34       m_RxQueue(),
    35 	  m_PropertyLocalPort(0),
    36 	  m_PropertyRemotePort(0)
    37 {
    38     m_MonitorThread = NULL;
    39 }
    41 IPCommPlugin::~IPCommPlugin()
    42 {
    43     Util::Debug("IPCommPlugin::~IPCommPlugin()");
    44     if (m_Open)
    45     {
    46         Close();
    47     }
    48     if (m_MonitorThread != NULL)
    49     {
    50         delete m_MonitorThread;
    51         m_MonitorThread = NULL;
    52     }
    53 }
    55 /*
    56  * This method initializes IPCommPlugin and Starts IPCommMonitorThread
    57  */
    58 DWORD IPCommPlugin::Init()
    59 {
    60     Util::Debug("IPCommPlugin::Init()");
    62     std::string filename = IP_INI_FILE_NAME;
    63 	map<string, string> IPCommPluginProperties;
    64 	Util::ReadProperties(filename.c_str(), IPCommPluginProperties);
    66 	CheckProperties(IPCommPluginProperties);
    68     CommChannelPlugin::Init();
    70 	m_MonitorThread = new IPCommMonitorThread(
    71 		&m_TxQueue, &m_RxQueue, m_PropertyLocalPort, m_PropertyRemoteHost, m_PropertyRemotePort);
    73 	m_MonitorThread->Start();
    75     Util::Debug("IPCommPlugin::Init() IPComm opened");
    76     m_Open = true;
    77     Util::Debug("IPCommPlugin::Init() OK");
    78     return NO_ERRORS;
    79 }
    81 /*
    82  * This method initializes class member variables from values in map
    83  */
    84 void IPCommPlugin::CheckProperties(map<string, string>& props)
    85 {
    86     char tmp[256];
    88     //Local port
    89     string val = props[IP_INI_LOCAL_PORT_PARAM];
    90 	Util::CheckCommandlineParam( PARAM_SWITCH_LOCAL_PORT_PARAM, val );
    91     if (!val.empty())
    92     {
    93         m_PropertyLocalPort = atol(val.c_str());
    94     }
    96 	if( m_PropertyLocalPort )
    97 	{
    98 		sprintf(tmp, "[IPComm] Local port : %d", m_PropertyLocalPort );
    99 		string s(tmp);
   100 		Util::Info(s);
   101 	}
   102 	else
   103 	{
   104 		//Remote host
   105 		m_PropertyRemoteHost = props[IP_INI_REMOTE_HOST_PARAM];
   106 		//Check and replace if -REMOTE_HOST was given as command line parameter
   107 		Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_HOST_PARAM, m_PropertyRemoteHost );
   108 		if(m_PropertyRemoteHost.empty())
   109 		{
   110 			throw "No remote host specified!";
   111 		}
   112 		sprintf(tmp, "[IPComm] Remote host : '%s'", m_PropertyRemoteHost.c_str());
   113 		string s = tmp;
   114 		Util::Info(s);
   116 		//Remote port
   117 		val = props[IP_INI_REMOTE_PORT_PARAM];
   118 		//Check and replace if -REMOTE_POST was given as command line parameter
   119 		Util::CheckCommandlineParam( PARAM_SWITCH_REMOTE_PORT_PARAM, val );
   120 		if (!val.empty())
   121 		{
   122 			m_PropertyRemotePort = atol(val.c_str());
   123 		}
   124 		if( m_PropertyRemotePort == 0)
   125 			throw "Invalid remote port specified!";
   127 		sprintf(tmp, "[IPComm] Remote port : %d", m_PropertyRemotePort );
   128 		s = tmp;
   129 		Util::Info(s);
   130 	}
   131 }
   133 /*
   134  * This method checks if data is available on incoming queue
   135  */
   136 bool IPCommPlugin::IsDataAvailable()
   137 {
   138     return (!m_RxQueue.empty());
   139 }
   141 /*
   142  * This method is used to push given data to outgoing queue and then 
   143  * wait for data to become available and read all data into single Data object 
   144  */
   145 DWORD IPCommPlugin::SendReceive(Data* data_in, Data** data_out, long timeout)
   146 {
   147     DWORD res;
   148     if ((res = Send(data_in, timeout)) == NO_ERRORS &&
   149         (res = ReceiveWait(data_out, timeout)) == NO_ERRORS)
   150     {
   151         return NO_ERRORS;
   152     }
   153     cout << "IPCommPlugin::SendReceive: error" << endl;
   154     return res;
   155 }
   157 /*
   158  * This method pushes the given Data object(of type Data::EData) to outgoing queue
   159  */
   160 DWORD IPCommPlugin::Send(Data* data_in, long timeout)
   161 {
   162     Data::DataType type = data_in->GetType();
   163     if (type == Data::EData)
   164     {
   165         DWORD length = data_in->GetLength();
   166         m_TxQueue.push(data_in);
   167         return NO_ERRORS;
   168     }
   169     else if (type == Data::EControl)
   170     {
   171         Util::Debug("IPCommPlugin::Send: Control Message");
   172         return NO_ERRORS;
   173     }
   174     return ERR_DG_COMM_DATA_SEND;
   175 }
   178 /*
   179  * This method is used to wait for data to become available in incoming queue 
   180  * and then read all data into single Data object which is given as parameter
   181  */
   182 DWORD IPCommPlugin::ReceiveWait(Data** data_out, long timeout)
   183 {
   184     long elapsed = 0;
   185     while (elapsed < timeout && !IsDataAvailable())
   186     {
   187         elapsed += 25;
   188         Sleep(25);
   189     }
   190     if (elapsed >= timeout)
   191     {
   192         return ERR_DG_COMM_DATA_RECV_TIMEOUT;
   193     }
   194     return Receive(data_out, timeout);
   195 }
   197 /*
   198  * This method is used to read all data in incoming queue to single Data object and store the result
   199  * to the data object given parameter
   200  */
   201 DWORD IPCommPlugin::Receive(Data** data_out, long timeout)
   202 {
   203     if (!m_RxQueue.empty())
   204     {
   205         *data_out = m_RxQueue.front();
   206         m_RxQueue.pop();
   207 	return NO_ERRORS;
   208     }
   209     return ERR_DG_COMM_DATA_RECV;
   210 }
   213 DWORD IPCommPlugin::Open()
   214 {
   215     return (m_Open ? NO_ERRORS : ERR_DG_COMM_OPEN);
   216 }
   218 DWORD IPCommPlugin::Close()
   219 {
   220     m_MonitorThread->Stop();
   221     WaitForSingleObject(m_MonitorThread->ThreadHandle(), g_MaximumShutdownWaitTime);
   222     return NO_ERRORS;
   223 }
   226 //**********************************************************************************
   227 // Class IPCommReaderThread
   228 //
   229 // This thread is used to read bytes from TCP/IP socket, encapsulate the bytes to Data objects 
   230 // and push them to incoming queue 
   231 //**********************************************************************************
   233 IPCommReaderThread::IPCommReaderThread(SafeQueue<Data*>* q,
   234                                        long bufsize)
   235 	:m_Running(false),
   236     m_Socket(NULL)
   237 {
   238 	m_Queue = q;
   239 	if (bufsize > 0)
   240 	{
   241 		m_TcpIpBufferSize = bufsize;
   242 	}
   243 	else
   244 	{
   245 		m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize;
   246 	}
   247 }
   249 /*
   250  * Main execution loop which reads bytes from socket, encapsulates the bytes to Data object and pushes them to incoming queue
   251  */	
   252 void IPCommReaderThread::Run()
   253 {
   254 	if( m_Socket )
   255 		m_Running = true;
   257 	BYTE* buffer = new BYTE[m_TcpIpBufferSize];
   258 	while (m_Running)
   259 	{
   260 		//Reading from TCP/IP port
   261 		//Util::Debug("[IPCommReaderThread] try to read");
   262 		int bytes_read = -1;
   263 		bytes_read = m_Socket->ReceiveBytes(buffer, m_TcpIpBufferSize);
   264 		if (bytes_read < 0)
   265 		{
   266 			Stop();
   267 			break;
   268 		}
   269 		if (bytes_read > 0) 
   270 		{
   271 			Data* d = new Data((void *)buffer, bytes_read, Data::EData);
   272 			if (Util::GetVerboseLevel() == Util::debug)
   273 			{
   274 				char tmp[64];
   275 				sprintf(tmp, "m_Socket->ReceiveBytes (%d (dec) bytes):", d->GetLength());
   276 				string s(tmp);
   277 				Util::Debug(s);
   278 			}
   279 			m_Queue->push(d);
   280 			d = NULL;
   281 		}
   282 		Sleep(50);
   283 	}
   284 	delete[] buffer;
   285 	buffer = NULL;
   286 }
   288 void IPCommReaderThread::Stop()
   289 {
   290 	m_Running = false;
   291 }
   293 bool IPCommReaderThread::IsRunning()
   294 {
   295 	return m_Running;
   296 }
   300 //**********************************************************************************
   301 // Class IPCommWriterThread
   302 //
   303 // This thread is used to write data from outgoing queue to TCP/IP socket
   304 //**********************************************************************************
   306 IPCommWriterThread::IPCommWriterThread(SafeQueue<Data*>* q)
   307 	:m_Running(false),
   308 	m_Socket(NULL)
   309 {
   310 	m_Queue = q;
   311 }
   313 /*
   314  * This method contains the main execution loop which gets Data from outgoing queue and sends it to socket
   315  */
   316 void IPCommWriterThread::Run()
   317 {
   318 	if( m_Socket )
   319 		m_Running = true;
   321 	while (m_Running)
   322 	{
   323 		// Sending to TCP/IP port
   324 		//Util::Debug("[IPCommWriterThread] try to send");
   325 		try
   326 		{
   327 			Data* d = m_Queue->front(50);
   328 			char* p = (char *)d->GetData();
   329 			DWORD l = d->GetLength();
   331             if (Util::GetVerboseLevel() == Util::debug)
   332 			{
   333 				char tmp[64];
   334 				sprintf(tmp, "[IPCommWriterThread] HTI MsgSize = %d", l);
   335 				string s(tmp);
   336 				Util::Debug(s);
   337 			}
   339 			m_Socket->SendBytes((const unsigned char *)p, l);
   340 			m_Queue->pop();
   341 			delete d;
   342 			d = NULL;
   344 		} catch (TimeoutException te)
   345 		{
   346 		//Util::Debug("[IPCommWriterThread]timeout exception");
   347 		}
   348 	}
   349 }
   351 void IPCommWriterThread::Stop()
   352 {
   353 	m_Running = false;
   354 }
   356 bool IPCommWriterThread::IsRunning()
   357 {
   358 	return m_Running;
   359 }
   362 //**********************************************************************************
   363 // Class IPCommMonitorThread
   364 //
   365 // This thread creates and starts reader and writer threads
   366 // The thread also monitors if reader and writer threads are running and restarts them in case either isn't running
   367 //**********************************************************************************
   369 IPCommMonitorThread::IPCommMonitorThread(SafeQueue<Data*>* TxQueue,
   370 										 SafeQueue<Data*>* RxQueue,
   371 										 int localPort,
   372                                          string& remoteHost,
   373                                          int remotePort)
   374 	: m_Running(false),
   375 	m_TxQueue(TxQueue),
   376 	m_RxQueue(RxQueue),
   377 	m_ReaderThread(NULL),
   378 	m_WriterThread(NULL),
   379 	m_LocalPort(localPort),
   380     m_RemoteHost(remoteHost),
   381 	m_RemotePort(remotePort)
   382 {
   383 }
   385 IPCommMonitorThread::~IPCommMonitorThread()
   386 {
   387 	if(m_ReaderThread)
   388 	{
   389 		delete m_ReaderThread;
   390 		m_ReaderThread = NULL;
   391 	}
   392 	if(m_WriterThread)
   393 	{
   394 		delete m_WriterThread;
   395 		m_WriterThread = NULL;
   396 	}
   397 }
   400 /*
   401  * This method has two functionalities
   402  * -It waits for incoming connections if local port is defined
   403  * -It tries to connect to remote host if local host is not defined
   404  */
   405 void IPCommMonitorThread::Connect(Socket*& s)
   406 {
   407 	// This trickery here is because if there are no sockets (Socket::nofSockets_)
   408 	// WSACleanup gets called and then SOAP gets messed up.
   409 	// And creating a new socket for a new connection seems to work better
   410 	// than using the old when re-connecting / re-listening.
   411 	Socket* new_s = new Socket();
   412 	delete s;
   413 	s = new_s;
   415 	// If local port is defined start open listening socket
   416 	SocketServer ss;
   417 	if( m_LocalPort )
   418 	{
   419 		Util::Info("[IPComm] Listen for incoming connection...");
   420 		String remoteHost( "[IPComm] Connected! Remote host : " );
   421 		ss.Accept( s, m_LocalPort, 1, remoteHost );
   422 		Util::Info(remoteHost);
   423 	}
   424 	// If not start connecting
   425 	else
   426 	{
   427 		Util::Info("[IPComm] Connecting...");
   428 		ss.Connect( s, m_RemoteHost.c_str(), m_RemotePort );
   429 		Util::Info("[IPComm] Connected!");
   430 	}
   431 }
   433 /*
   434  * Main execution loop of thread
   435  * -Creates reader and writer threads and starts them
   436  * -Monitors if either reader or writer thread aren't running and restarts them if not
   437  */
   438 void IPCommMonitorThread::Run()
   439 {
   440 	Socket* s = NULL;
   442 	m_ReaderThread = new IPCommReaderThread( m_RxQueue, g_DataGatewayDefaultTcpIpBufferSize );
   443 	m_WriterThread = new IPCommWriterThread( m_TxQueue ) ;
   445 	m_Running = true;
   446 	while (m_Running)
   447 	{
   448 		// Reader thread should stop running when connection is lost
   449 		if( !m_ReaderThread->IsRunning() || !m_WriterThread->IsRunning() )
   450 		{
   451 			Util::Info( "[IPComm] Disconnected!" );
   453 			// Stop the treads
   454 			m_ReaderThread->Stop();
   455 		    WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
   456 			m_WriterThread->Stop();
   457 			WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
   459 			// Try to connect again. 
   460 			BOOL connected = false;
   461 			while( m_Running && connected == false)
   462 			{
   463 				try{
   464 					Connect(s);
   465 					connected = true;
   466 					m_ReaderThread->m_Socket = s;
   467 					m_WriterThread->m_Socket = s;
   469 					// Start threads
   470 					m_ReaderThread->Start();
   471 					m_WriterThread->Start();
   472 				}
   473 				catch( char* ){
   474 					Sleep(1000);
   475 				}
   476 			}
   477 		}
   478 		Sleep(1000);
   479 	}
   481 	// Stop the treads
   482 	m_ReaderThread->Stop();
   483 	WaitForSingleObject(m_ReaderThread->ThreadHandle(), g_MaximumShutdownWaitTime);
   484 	m_WriterThread->Stop();
   485 	WaitForSingleObject(m_WriterThread->ThreadHandle(), g_MaximumShutdownWaitTime);
   486 	// and close the current socket
   487 	if( s )
   488 		delete s;
   489 }
   491 void IPCommMonitorThread::Stop()
   492 {
   493 	m_Running = false;
   494 	m_ReaderThread->Stop();
   495 	m_WriterThread->Stop();
   496 }
   498 bool IPCommMonitorThread::IsRunning()
   499 {
   500 	return m_Running;
   501 }
   503 // End of the file