hti/PC_Tools/DataGateway/SRC/datagateway.cpp
branchRCL_3
changeset 59 8ad140f3dd41
parent 0 a03f92240627
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/hti/PC_Tools/DataGateway/SRC/datagateway.cpp	Wed Oct 13 16:17:58 2010 +0300
@@ -0,0 +1,495 @@
+/*
+* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). 
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+* 
+* Description:
+*   This file contains the header file of the DataGatewaySocketWriterThread,
+*	DataGatewaySocketReaderThread, DataGatewayClientThread and DataGateway
+*	classes.
+*/
+
+// INCLUDES
+#include "datagateway.h"
+
+//**********************************************************************************
+// Class DataGatewaySocketWriterThread
+//
+// This thread is used to read Data from outgoing queue 
+// and write it to Socket(connected to program using DataGateway)
+//**********************************************************************************
+
+DataGatewaySocketWriterThread::DataGatewaySocketWriterThread(SafeQueue<Data*>* q,
+															 Socket* s)
+	: m_Running(true)
+{
+	m_Queue = q;
+	m_Socket = s;
+}
+
+/*
+ * Main loop of thread
+ * Reads Data from outgoing queue and writes it to Socket(connected to program using DataGateway)
+ */
+void DataGatewaySocketWriterThread::Run()
+{
+	while (m_Running)
+	{
+		// Sending to TCP/IP port
+		//Util::Debug("[DataGatewaySocketWriterThread] try to send");
+		try
+		{
+			Data* d = m_Queue->front(50);
+			char* p = (char *)d->GetData();
+			//DWORD l = (DWORD)((((BYTE)p[g_HtiOffsetMessageSize] << 8) | (BYTE)p[g_HtiOffsetMessageSize+1]));
+			DWORD l = d->GetLength();
+
+			if (Util::GetVerboseLevel() == Util::debug)
+			{
+				char tmp[64];
+				sprintf(tmp, "[DataGatewaySocketWriterThread] HTI MsgSize = %d", l);
+				string s(tmp);
+				Util::Debug(s);
+				//Util::Hex(p, min(16,d->GetLength()));
+			}
+
+			//m_Socket->SendBytes((const unsigned char *)&p[g_HtiOffsetMessageData], l);
+			m_Socket->SendBytes((const unsigned char *)p, l);
+			Util::Debug("[DataGatewaySocketWriterThread] msg send");
+			m_Queue->pop();
+			delete d;
+			d = NULL;
+
+		} catch (TimeoutException te)
+		{
+			//Util::Debug("[DataGatewaySocketWriterThread]timeout exception");
+		}
+	}
+}
+
+void DataGatewaySocketWriterThread::Stop()
+{
+	m_Running = false;
+}
+
+bool DataGatewaySocketWriterThread::IsRunning()
+{
+	return m_Running;
+}
+
+//**********************************************************************************
+// Class DataGatewaySocketReaderThread
+//
+// This thread is used to read incoming bytes from Socket(connected to program using DataGateway)
+// which it then encapsulates into Data objects and forwards to outgoing queue
+//**********************************************************************************
+
+DataGatewaySocketReaderThread::DataGatewaySocketReaderThread(SafeQueue<Data*>* q,
+															 long bufsize,
+															 Socket* s)
+	: m_Running(true)
+{
+	m_Queue = q;
+	m_Socket = s;
+	if (bufsize > 0)
+	{
+		m_TcpIpBufferSize = bufsize;
+	}
+	else
+	{
+		m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize;
+	}
+}
+
+/*
+ * Main loop of thread
+ * Reads bytes from Socket(connected to program using DataGateway), encapsulates them to Data object and puts these to outgoing queue
+ */
+void DataGatewaySocketReaderThread::Run()
+{
+	BYTE* buffer = new BYTE[m_TcpIpBufferSize];
+	while (m_Running)
+	{
+		// Reading from TCP/IP port
+		Util::Debug("[DataGatewaySocketReaderThread] try to read");
+		int bytes_read = -1;
+		bytes_read = m_Socket->ReceiveBytes(buffer, m_TcpIpBufferSize);
+		if (bytes_read < 0)
+		{
+			Stop();
+			break;
+		}
+		if (bytes_read > 0) 
+		{
+			Data* d = new Data((void *)buffer, bytes_read, Data::EData);
+			if (Util::GetVerboseLevel() == Util::debug)
+			{
+				char tmp[64];
+				sprintf(tmp, "m_Socket->ReceiveBytes (%d (dec) bytes):", d->GetLength());
+				string s(tmp);
+				Util::Debug(s);
+				//Util::Hex(p, d->GetLength());
+			}
+
+			m_Queue->push(d);
+			d = NULL;
+		}
+		Sleep(50);
+	}
+	delete[] buffer;
+	buffer = NULL;
+}
+
+void DataGatewaySocketReaderThread::Stop()
+{
+	m_Running = false;
+}
+
+bool DataGatewaySocketReaderThread::IsRunning()
+{
+	return m_Running;
+}
+
+//**********************************************************************************
+// Class DataGatewayClientThread
+//
+// This thread serves DataGateway's clients
+// Gets Data from incoming queue to which DataGatewaySocketReader has pushed it and forwards
+// them to CommChannelPlugin.
+// The thread also reads incoming data from CommChannelPlugin and forwards them to outgoing queue which
+// DataGatewaySocketWriter then reads
+//**********************************************************************************
+
+DataGatewayClientThread::DataGatewayClientThread(Socket** s,
+												 long bufsize,
+												 const string& commchannel)
+	: m_ReaderThread(&m_ReaderQueue, bufsize, *s),
+	  m_WriterThread(&m_WriterQueue, *s),
+  	  m_CommChannelPluginName(commchannel),
+	  m_Running(true)
+{
+	m_CCLateInit = true;
+	m_Socket = s;
+	if (bufsize > 0)
+	{
+		m_TcpIpBufferSize = bufsize;
+	}
+	else
+	{
+		m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize;
+	}
+}
+
+DataGatewayClientThread::DataGatewayClientThread(Socket** s,
+												 long bufsize,
+										         CommChannelPlugin** f)
+	: m_ReaderThread(&m_ReaderQueue, bufsize, *s),
+	  m_WriterThread(&m_WriterQueue, *s),
+  	  m_CommChannelPluginName((*f)->GetName()),
+	  m_Running(true)
+{
+	m_CCLateInit = false;
+	m_Socket = s;
+	m_CommChannelPlugin = *f;
+	if (bufsize > 0)
+	{
+		m_TcpIpBufferSize = bufsize;
+	}
+	else
+	{
+		m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize;
+	}
+}
+
+DataGatewayClientThread::~DataGatewayClientThread()
+{
+	Util::Debug("DataGatewayClientThread::~DataGatewayClientThread()");
+	if (m_Running)
+	{
+		Stop();
+	}
+}
+
+/*
+ * Main loop of thread
+ * Gets Data from incoming queue to which DataGatewaySocketReader has pushed it and forwards
+ * them to CommChannelPlugin.
+ * Reads incoming data from CommChannelPlugin and forwards them to outgoing queue which
+ * DataGatewaySocketWriter then reads
+ */
+void DataGatewayClientThread::Run()
+{
+	DWORD res;
+
+	if (m_CCLateInit)
+	{		
+		m_CommChannelPlugin = CommChannelPlugin::Instance(m_CommChannelPluginName);
+		if (m_CommChannelPlugin == NULL)
+		{
+			g_ErrorCode = ERR_DG_COMMCHANNEL;
+			return;
+		}
+		if ((res = m_CommChannelPlugin->Connect()) != NO_ERRORS)
+		{
+			Util::Error("[DataGateway] Error - Cannot connect to the target.");
+			m_CommChannelPlugin->Disconnect();
+			g_ErrorCode = res;
+			return;
+		}
+		Util::Info("[DataGateway] Communication Channel Plugin loaded succesfully");
+	}
+	
+	//Start DataGatewaySocketReader and DataGatewaySocketWriter threads
+	m_ReaderThread.Start();
+	m_WriterThread.Start();
+
+	// Flush comm input buffer;
+	Data* dummy;
+	while (m_CommChannelPlugin->Receive(&dummy) == NO_ERRORS) continue;
+	dummy = NULL;
+
+	while (m_Running)
+	{
+		if (!m_ReaderThread.IsRunning() ||
+			!m_WriterThread.IsRunning())
+		{
+			Stop();
+			break;
+		}
+			
+		// Receiving from TCP/IP port and
+		// sending to CommChannelPlugin
+		try {
+			Data* d = m_ReaderQueue.front(50);
+			m_ReaderQueue.pop();
+			m_CommChannelPlugin->Send(d);
+			d = NULL;
+		} catch (TimeoutException te) {}
+			
+		// Receiving from CommChannelPlugin and
+		// sending data to TCP/IP port. If message
+		// is error or control message it is also
+		// handled here.
+		Data* out;
+
+		if (m_CommChannelPlugin->Receive(&out) != NO_ERRORS) continue;
+
+		switch (out->GetType())
+		{
+			case Data::EData:
+			{
+				m_WriterQueue.push(out);
+				out = NULL;				
+			}
+			break;
+			case Data::EControl:
+			{
+				Util::Debug("ClientThread: Control Message Received");
+				switch (*(BYTE*)out->GetData())
+				{
+					case ControlPhonePowered:
+						{
+							Util::Info("[DataGateway] Phone powered up");
+						}
+					break;
+				}
+			}
+			break;
+			case Data::EError:
+			{
+				Util::Debug("ClientThread: Error Message Received");
+				Stop();
+			}
+			break;
+
+			default:
+				{
+					Util::Debug("ClientThread: Unknown Message Received");
+				}
+			break;
+		}
+		delete out;
+		out = NULL;
+	}
+	delete *m_Socket;
+	m_Socket = NULL;
+
+	if (m_CCLateInit)
+	{
+		m_CommChannelPlugin->Disconnect();
+		Util::Info("[DataGateway] Communication Channel Plugin unloaded");
+		m_CommChannelPlugin = NULL;
+	}
+}
+
+void DataGatewayClientThread::Stop()
+{
+	m_Running = false;
+	m_WriterThread.Stop();
+	m_ReaderThread.Stop();
+	HANDLE handles[2];
+	handles[0] = m_WriterThread.ThreadHandle();
+	handles[1] = m_ReaderThread.ThreadHandle();
+	WaitForMultipleObjects(2, handles, TRUE, g_MaximumShutdownWaitTime);
+}
+
+//**********************************************************************************
+// Class DataGateway
+//
+// This is the main thread of DataGateway
+//**********************************************************************************
+
+DataGateway::DataGateway(int port,
+						 long bufsize,
+						 const string& commchannel,
+						 bool stayalive,
+						 bool cclateinit)
+	: m_TcpIpPort(port),
+	  m_TcpIpBufferSize(bufsize),
+  	  m_CommChannelPluginName(commchannel),
+	  m_StayAlive(stayalive),
+	  m_CCLateInit(cclateinit),
+	  m_Running(true)
+{
+	m_CommChannelPlugin = NULL;
+}
+
+/*
+ * Main loop of DataGateway
+ * This loop:
+ * -creates instance of CommChannelPlugin if lateinit isn't set on
+ * -starts listening to socket for incoming connections
+ * -after connection has arrived starts DataGatewayClient thread to serve the client
+ */
+void DataGateway::Run()
+{
+	DWORD res;
+	try
+	{
+		if (Util::GetVerboseLevel() >= Util::info)
+		{
+			char tmp[256];
+			sprintf(tmp, "[DataGateway] Using TCP/IP port %d", m_TcpIpPort);
+			string s(tmp);
+			Util::Info(s);
+			
+			sprintf(tmp, "[DataGateway] TCP/IP receive buffer size is %d bytes", m_TcpIpBufferSize);
+			s.assign(tmp);
+			Util::Info(s);
+
+			sprintf(tmp, "[DataGateway] Loading Communication Channel Plugin for [%s]", m_CommChannelPluginName.c_str());
+			s.assign(tmp);
+			Util::Info(s);
+		}
+
+		
+		//Util::Info("[DataGateway] TCP/IP port opened");
+		
+		if (!m_CCLateInit)
+		{
+		    //if not lateinit, CommChannelPlugin may be started
+			m_CommChannelPlugin = CommChannelPlugin::Instance(m_CommChannelPluginName);
+			if (m_CommChannelPlugin == NULL)
+			{
+				throw UtilError("[DataGateway] Error loading Communication Channel.", ERR_DG_COMMCHANNEL);
+			}
+			if ((res = m_CommChannelPlugin->Connect()) != NO_ERRORS)
+			{
+				m_CommChannelPlugin->Disconnect();
+				throw UtilError("[DataGateway] Error connecting to the target.", res);
+			}
+			Util::Info("[DataGateway] Communication Channel Plugin loaded succesfully");
+		}
+		else
+		{
+			Util::Info("[DataGateway] Communication Channel Plugin uses late initialization.");
+		}
+
+		while (m_Running)
+		{
+			g_ErrorCode = NO_ERRORS;
+			Util::Info("[DataGateway] Waiting connection");
+			Socket* s = new Socket(); 
+			SocketServer in;
+			// will use the socket for listening and return a new socket when connection 
+			// is established
+			String remoteHost( "[DataGateway] Connection established! Remote host : " );
+			in.Accept(s, m_TcpIpPort, 1, remoteHost); 
+			Util::Info(remoteHost);
+			DataGatewayClientThread* client;
+			if (m_CCLateInit)
+			{
+				// passes ownership of 's'
+				client = new DataGatewayClientThread(&s, m_TcpIpBufferSize, m_CommChannelPluginName);
+			}
+			else
+			{
+				// passes ownership of 's'
+				client = new DataGatewayClientThread(&s, m_TcpIpBufferSize, &m_CommChannelPlugin);
+			}
+			client->Start();
+
+			HANDLE handles[2];
+			handles[0] = client->ThreadHandle();
+			handles[1] = m_ShutdownEvent.EventHandle();
+			DWORD dwResult = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
+			switch (dwResult)
+			{
+				case WAIT_OBJECT_0 + 0:
+				{
+					Util::Debug("DataGateway::Run() Client thread stopped");
+				}
+				break;
+				case WAIT_OBJECT_0 + 1:
+				{
+					Util::Debug("DataGateway::Run() Request to shutdown");
+					client->Stop();
+					WaitForSingleObject(client->ThreadHandle(), g_MaximumShutdownWaitTime);
+				}
+				break;
+			}
+			Util::Info("[DataGateway] Connection closed.");
+			delete client;
+			client = NULL;
+			
+			if (!m_StayAlive) break;
+		}
+		if (!m_CCLateInit)
+		{
+			m_CommChannelPlugin->Disconnect();
+			Util::Info("[DataGateway] Communication Channel Plugin unloaded");
+			m_CommChannelPlugin = NULL;
+		}
+	} catch (char* s) {
+		char tmp[64];
+		sprintf(tmp, "[DataGateway] Error opening TCP/IP port - %s", s);
+		Util::Error(tmp);
+		g_ErrorCode = ERR_DG_SOCKET;
+	} catch (UtilError ue) {
+		Util::Error(ue.iError, ue.iResult);
+		g_ErrorCode = ue.iResult;
+	}
+
+	Util::Info("[DataGateway] Closed");
+}
+
+void DataGateway::Stop()
+{
+	m_Running = false;
+	m_ShutdownEvent.Set();
+}
+
+bool DataGateway::IsRunning()
+{
+	return m_Running;
+}
+
+// End of the file
\ No newline at end of file