connectivity/com.nokia.tcf/native/TCFNative/TCFServer/ConnectionImpl.cpp
changeset 60 9d2210c8eed2
child 366 b054461d2f85
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/connectivity/com.nokia.tcf/native/TCFNative/TCFServer/ConnectionImpl.cpp	Mon Apr 06 15:18:48 2009 -0500
@@ -0,0 +1,907 @@
+/*
+* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of the License "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description: 
+*
+*/
+// ConnectionImpl.cpp: implementation of the CConnectionImpl class.
+//
+//////////////////////////////////////////////////////////////////////
+
+#include "stdafx.h"
+#include "ConnectionImpl.h"
+#include "RegistryImpl.h"
+#include "ServerManager.h"
+
+#ifdef _DEBUG
+extern BOOL gDoLogging;
+#endif
+
+#define LOG_CONNECTION
+#if defined(LOG_CONNECTION) && defined(_DEBUG)
+#define TCDEBUGOPEN() if (gDoLogging) { this->m_DebugLog->WaitForAccess(); }
+#define TCDEBUGLOGS(s) if (gDoLogging) { sprintf(this->m_DebugLogMsg,"%s", s); this->m_DebugLog->log(this->m_DebugLogMsg); }
+#define TCDEBUGLOGA1(s, a1) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1); this->m_DebugLog->log(this->m_DebugLogMsg); }
+#define TCDEBUGLOGA2(s, a1, a2) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1, a2); this->m_DebugLog->log(this->m_DebugLogMsg); }
+#define TCDEBUGLOGA3(s, a1, a2, a3) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1, a2, a3); this->m_DebugLog->log(this->m_DebugLogMsg); }
+#define TCDEBUGLOGA4(s, a1, a2, a3, a4) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1, a2, a3, a4); this->m_DebugLog->log(this->m_DebugLogMsg); }
+#define TCDEBUGCLOSE() if (gDoLogging) { this->m_DebugLog->ReleaseAccess(); }
+#else
+#define TCDEBUGOPEN()
+#define TCDEBUGLOGS(s)
+#define TCDEBUGLOGA1(s, a1)
+#define TCDEBUGLOGA2(s, a1, a2)
+#define TCDEBUGLOGA3(s, a1, a2, a3)
+#define TCDEBUGLOGA4(s, a1, a2, a3, a4)
+#define TCDEBUGCLOSE()
+#endif
+
+//////////////////////////////////////////////////////////////////////
+// Construction/Destruction
+//////////////////////////////////////////////////////////////////////
+
+CConnectionImpl::CConnectionImpl()
+{
+	m_ConnectSettings = NULL;
+	m_ClientList = NULL;
+	m_Status = eDisconnected;
+	m_Registry = NULL;
+	m_ConnectionID = 0;
+	m_OsError = 0;
+	m_BaseComm = NULL;
+	m_BaseProtocol = NULL;
+	m_BaseCommHandle = NULL;
+	m_BaseProtocolHandle = NULL;
+
+	// message processing thread flags and handles
+	m_MessageProcessorState = MP_NONE;
+	m_ExitMessageProcessor = false;
+	m_PauseMessageProcessing = false;
+	m_StartMessageProcessing = false;
+	m_hMessageProcessorExittedEvent = NULL;
+	m_hMessageProcessorStoppedEvent = NULL;
+	m_hMessageProcessorStartedEvent = NULL;
+	m_hMessageProcessorThread = NULL;
+	m_dwMessageProcessorThreadId = 0;
+
+	m_NextRetryTime = m_RetryTimeoutTime = 0;
+
+	m_NextFlushFileTime = 0;
+}
+
+CConnectionImpl::CConnectionImpl(ConnectData conData, DWORD connectionId)
+{
+#ifdef _DEBUG
+	if (gDoLogging)
+	{
+		m_DebugLog = new TCDebugLog("TCF_ConnectionLog", connectionId);
+		m_DebugLog2 = new TCDebugLog("TCF_ProcessorLog", connectionId);
+	}
+	else
+	{
+		m_DebugLog = NULL;
+		m_DebugLog2 = NULL;
+	}
+#else
+	m_DebugLog = NULL;
+	m_DebugLog2 = NULL;
+#endif
+
+	TCDEBUGOPEN();
+	TCDEBUGLOGA1("CConnectionImpl::CConnectionImpl id = %d\n", connectionId);
+
+	m_ConnectSettings = new ConnectData();
+
+	memcpy(m_ConnectSettings, &conData, sizeof(ConnectData));
+
+	m_ClientList = new ClientList();
+	m_ClientList->clear();
+	m_Status = eDisconnected;
+	m_Registry = new CRegistryImpl(connectionId);
+	m_ConnectionID = connectionId;
+	m_BaseComm = NULL;
+	m_BaseProtocol = NULL;
+	m_BaseCommHandle = NULL;
+	m_BaseProtocolHandle = NULL;
+
+	// message processing thread flags and handles
+	m_MessageProcessorState = MP_NONE;
+	m_ExitMessageProcessor = false;
+	m_PauseMessageProcessing = false;
+	m_StartMessageProcessing = false;
+
+	// create named events
+	char eventName[100];
+
+	sprintf(eventName, "%s%d", MESSAGEPROCESSOR_EXITEVENT_BASENAME, connectionId);
+	m_hMessageProcessorExittedEvent = ::CreateEvent(NULL, FALSE, FALSE, eventName);
+
+	sprintf(eventName, "%s%d", MESSAGEPROCESSOR_STOPEVENT_BASENAME, connectionId);
+	m_hMessageProcessorStoppedEvent = ::CreateEvent(NULL, FALSE, FALSE, eventName);
+
+	sprintf(eventName, "%s%d", MESSAGEPROCESSOR_STARTEVENT_BASENAME, connectionId);
+	m_hMessageProcessorStartedEvent = ::CreateEvent(NULL, FALSE, FALSE, eventName);
+
+	m_hMessageProcessorThread = NULL;
+	m_dwMessageProcessorThreadId = 0;
+
+	m_NextRetryTime = m_RetryTimeoutTime = 0;
+
+	m_NextFlushFileTime = 0;
+	m_OsError = 0;
+
+	TCDEBUGCLOSE();
+}
+CConnectionImpl::~CConnectionImpl()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::~CConnectionImpl\n");
+
+	// terminate the message processor thread if running
+
+	if (m_hMessageProcessorThread != NULL)
+	{
+		BOOL t = ::TerminateThread(m_hMessageProcessorThread, 0);
+		::CloseHandle(m_hMessageProcessorThread);
+	}
+
+	if (m_hMessageProcessorExittedEvent != NULL)
+	{
+		::CloseHandle(m_hMessageProcessorExittedEvent);
+	}
+
+	if (m_hMessageProcessorStoppedEvent != NULL)
+	{
+		::CloseHandle(m_hMessageProcessorStoppedEvent);
+	}
+
+	if (m_ConnectSettings)
+		delete m_ConnectSettings;
+
+
+	if (m_ClientList)
+	{
+		m_ClientList->clear();
+		delete m_ClientList;
+	}
+
+	if (m_Registry)
+	{
+		delete m_Registry;
+	}
+
+	if (m_BaseComm)
+	{
+		delete m_BaseComm;
+	}
+
+	if (m_BaseCommHandle)
+	{
+		::FreeLibrary(m_BaseCommHandle);
+	}
+	if (m_BaseProtocol)
+	{
+		delete m_BaseProtocol;
+	}
+
+	if (m_BaseProtocolHandle)
+	{
+		::FreeLibrary(m_BaseProtocolHandle);
+	}
+
+	TCDEBUGCLOSE();
+	if (m_DebugLog)
+		delete m_DebugLog;
+	if (m_DebugLog2)
+		delete m_DebugLog2;
+
+}
+
+BOOL CConnectionImpl::IsEqual(CConnection* connection)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::IsEqual\n");
+
+	BOOL equal = FALSE;
+
+	if (strcmp(m_ConnectSettings->connectType, connection->m_ConnectSettings->connectType) == 0)
+	{
+		if (m_BaseComm)
+		{
+			if (m_BaseComm->IsConnectionEqual(connection->m_ConnectSettings))
+			{
+				equal = TRUE;
+			}
+		}
+		else
+		{
+			equal = TRUE;
+		}
+	}
+
+	TCDEBUGCLOSE();
+	return equal;
+}
+
+BOOL CConnectionImpl::IsEqual(pConnectData pConData)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::IsEqual\n");
+
+	BOOL equal = FALSE;
+
+	if (strcmp(m_ConnectSettings->connectType, pConData->connectType) == 0)
+	{
+		if (m_BaseComm)
+		{
+			if (m_BaseComm->IsConnectionEqual(pConData))
+			{
+				equal = TRUE;
+			}
+		}
+		else
+		{
+			equal = TRUE;
+		}
+	}
+	TCDEBUGCLOSE();
+	return equal;
+}
+
+long CConnectionImpl::DoConnect()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::DoConnect\n");
+
+	long ret = TCAPI_ERR_NONE;
+
+	if (m_BaseComm && m_BaseProtocol)
+	{
+		ret = m_BaseComm->OpenPort();
+		if (ret != TCAPI_ERR_NONE)
+		{
+			m_OsError = m_BaseComm->m_lastCommError;
+			TCDEBUGLOGA1(" m_BaseComm->OpenPort = %d\n", ret);
+		}
+	}
+	else
+	{
+		ret = TCAPI_ERR_UNKNOWN_MEDIA_TYPE;
+	}
+
+	if (ret == TCAPI_ERR_NONE)
+	{
+		m_Status = eConnected;
+
+		TCDEBUGCLOSE();
+
+		StartProcessing();
+	}
+	else
+	{
+//		if (m_BaseComm != NULL)
+//		{
+//			delete m_BaseComm;
+//			m_BaseComm = NULL;
+//		}
+		TCDEBUGCLOSE();
+	}
+	return ret;
+}
+
+long CConnectionImpl::DoDisconnect()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::DoDisconnect\n");
+
+	long ret = TCAPI_ERR_NONE;
+	if (IsConnected())
+	{
+		ret = m_BaseComm->ClosePort();
+//		delete m_BaseComm;
+//		m_BaseComm = NULL;
+	}
+	m_Status = eDisconnected;
+
+	TCDEBUGCLOSE();
+	return ret;
+}
+
+BOOL CConnectionImpl::AddClient(CClient* client)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::AddClient\n");
+
+	BOOL ok = TRUE;
+
+	m_ClientList->push_back(client);
+
+	TCDEBUGCLOSE();
+	return ok;
+}
+
+long CConnectionImpl::DoSendMessage(long encodeOption, BYTE protocolVersion, BOOL useMsgId, BYTE msgId, DWORD msgLength, BYTE* pMsg)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::DoSendMessage\n");
+
+	long err = TCAPI_ERR_NONE;
+	if (IsRetryInProgress())
+	{
+		err = TCAPI_ERR_COMM_RETRY_IN_PROGRESS;
+	}
+	else if (IsRetryTimedOut())
+	{
+		err = TCAPI_ERR_COMM_TIMEOUT;
+	}
+	else if (m_Status == eConnected)
+	{
+		BYTE* encodedMessage = new BYTE[msgLength + 40]; // add enough for header (msgLength may be 0)
+		// if msgLength == 0, then encodeOption SHOULD be ENCODE_FORMAT since com expects to send something!
+		if (encodeOption == ENCODE_FORMAT)
+		{
+#ifdef _DEBUG
+			char msg[200]; msg[0] = '\0';
+			int len = (msgLength > 30) ? 30 : msgLength;
+			for (int i = 0; i < len; i ++)
+			{
+				sprintf(msg, "%s%02.2x ", msg, pMsg[i]);
+			}
+			sprintf(msg, "%s\n", msg);
+			TCDEBUGLOGS(msg);
+#endif
+			// msgLength maybe 0 and pMsg maybe NULL (we're not sending a raw message, just a protocol header)
+			msgLength = m_BaseProtocol->EncodeMessage(pMsg, msgLength, protocolVersion, msgId, encodedMessage, msgLength+40);
+#ifdef _DEBUG
+			msg[0] = '\0';
+			len = (msgLength > 30) ? 30 : msgLength;
+			for (i = 0; i < len; i ++)
+			{
+				sprintf(msg, "%s%02.2x ", msg, encodedMessage[i]);
+			}
+			sprintf(msg, "%s\n", msg);
+			TCDEBUGLOGS(msg);
+#endif
+			err = m_BaseComm->SendDataToPort(msgLength, encodedMessage);
+		}
+		else
+		{
+#ifdef _DEBUG
+			char msg[200]; msg[0] = '\0';
+			int len = (msgLength > 30) ? 30 : msgLength;
+			for (int i = 0; i < len; i ++)
+			{
+				sprintf(msg, "%s%02.2x ", msg, pMsg[i]);
+			}
+			sprintf(msg, "%s\n", msg);
+			TCDEBUGLOGS(msg);
+#endif
+			// msgLength != 0 and pMsg != NULL
+			err = m_BaseComm->SendDataToPort(msgLength, pMsg);
+		}
+		delete[] encodedMessage;
+
+		TCDEBUGLOGS("CConnectionImpl::DoSendMessage done\n");
+		if (err == TCAPI_ERR_COMM_ERROR)
+		{
+			EnterRetryPeriod(err, true, m_BaseComm->m_lastCommError);
+			m_OsError = m_BaseComm->m_lastCommError;
+		}
+	}
+	else
+	{
+		err = TCAPI_ERR_MEDIA_NOT_OPEN;
+	}
+
+	TCDEBUGLOGA1("CConnectionImpl::DoSendMessage err = %d\n", err);
+	TCDEBUGCLOSE();
+	return err;
+}
+
+long CConnectionImpl::DoRetryProcessing()
+{
+	long err = TCAPI_ERR_NONE;
+
+	// if not connected
+	//   return no error
+	if (m_BaseComm == NULL /*|| m_BaseComm->IsConnected() == false*/)
+		return TCAPI_ERR_MEDIA_NOT_OPEN;
+
+	// if retry not in progress && retry not timed out
+	//   return no error
+	if (!IsRetryInProgress() && !IsRetryTimedOut())
+		return TCAPI_ERR_NONE;
+
+//	TCDEBUGOPEN();
+//	TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing\n");
+//	TCDEBUGCLOSE();
+	// if retry timeout flag already set
+	//   return timeout error
+	if (IsRetryTimedOut())
+		return TCAPI_ERR_COMM_TIMEOUT;
+
+	// get current time
+	time_t ctime;
+	time(&ctime);
+	// if retry timeout period has expired
+	if (ctime >= m_RetryTimeoutTime)
+	{
+		TCDEBUGOPEN();
+		TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing retry timeout\n");
+		TCDEBUGCLOSE();
+		// send timeout error to all clients
+		NotifyClientsCommError(TCAPI_ERR_COMM_TIMEOUT);
+		// close comm port
+		m_BaseComm->ClosePort();
+		// set retry timeout flag
+		SetRetryTimedOut();
+		// return retry timeout error
+		err = TCAPI_ERR_COMM_TIMEOUT;
+	}
+	// else if retry time has passed
+	else if (ctime >= m_NextRetryTime)
+	{
+		TCDEBUGOPEN();
+		TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing retry time\n");
+		TCDEBUGCLOSE();
+		// close comm port
+		// reopen comm port
+		m_BaseComm->ClosePort();
+		int openErr = m_BaseComm->OpenPort();
+		// if comm error
+		if (openErr != TCAPI_ERR_NONE)
+		{
+			// set next retry time
+			// return comm error
+			m_NextRetryTime = ctime + m_ConnectSettings->retryInterval;
+			err = TCAPI_ERR_COMM_RETRY_IN_PROGRESS;
+			m_OsError = m_BaseComm->m_lastCommError;
+		}
+		else
+		{
+		TCDEBUGOPEN();
+		TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing reconnected\n");
+		TCDEBUGCLOSE();
+			// send reconnect warning to all clients
+			NotifyClientsCommError(TCAPI_INFO_COMM_RECONNECTED);
+			// set connected
+			SetConnected();
+			err = TCAPI_ERR_NONE;
+		}
+	} 
+	else // still in retry
+	{
+		err = TCAPI_ERR_COMM_RETRY_IN_PROGRESS;
+	}
+
+
+//	TCDEBUGOPEN();
+//	TCDEBUGLOGA1("CConnectionImpl::DoRetryProcessing err = %d\n", err);
+//	TCDEBUGCLOSE();
+	return err;
+}
+long CConnectionImpl::EnterRetryPeriod(long commErr, bool passOsErr, DWORD osErr)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::EnterRetryPeriod\n");
+	TCDEBUGCLOSE();
+
+	long err = TCAPI_ERR_NONE;
+
+	// set next retry time
+	time_t ctime;
+	time(&ctime);
+	m_NextRetryTime = ctime + m_ConnectSettings->retryInterval;
+	// set retry timeout time
+	m_RetryTimeoutTime = ctime + m_ConnectSettings->retryTimeout;
+	// send comm error to all clients
+	NotifyClientsCommError(commErr, passOsErr, osErr);
+	// set retry in progress flag
+	SetRetryInProgress();
+
+	return err;
+}
+
+BOOL CConnectionImpl::RemoveClient(CClient* client)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::RemoveClient\n");
+
+	BOOL found = FALSE;
+
+	if (m_ClientList->size() != 0)
+	{
+		ClientList::iterator iter;
+		for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
+		{
+			if ((*iter)->GetClientId() == client->GetClientId())
+			{
+				m_ClientList->erase(iter);
+				found = TRUE;
+				break;
+			}
+		}
+	}
+
+	TCDEBUGCLOSE();
+	return found;
+}
+
+BOOL CConnectionImpl::ExitProcessing()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::ExitProcessing\n");
+
+	// exit the messageprocessing thread
+	if (m_hMessageProcessorThread != NULL)
+	{
+		m_MessageProcessorState = MP_EXIT;
+
+		m_StartMessageProcessing = false;
+		m_PauseMessageProcessing = true;
+		m_ExitMessageProcessor = true;
+		DWORD waitStatus = ::WaitForSingleObject(m_hMessageProcessorExittedEvent, MESSAGEPROCESSOR_EVENTWAIT_TIMEOUT);
+		TCDEBUGLOGA1("CConnectionImpl::ExitProcessing waitStatus=%x\n", waitStatus);
+		::CloseHandle(m_hMessageProcessorThread);
+		m_hMessageProcessorThread = NULL;
+	}
+	
+	TCDEBUGCLOSE();
+	return TRUE;
+}
+
+BOOL CConnectionImpl::StartProcessing()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::StartProcessing\n");
+
+	// starts processing thread
+	if (m_hMessageProcessorThread == NULL)
+	{
+		m_MessageProcessorState = MP_PAUSE;
+
+		m_ExitMessageProcessor = false;
+		m_StartMessageProcessing = false;
+		m_PauseMessageProcessing = false;
+		// TODO: create thread
+		m_hMessageProcessorThread = ::CreateThread(
+			NULL,
+			0,
+			(LPTHREAD_START_ROUTINE) MessageProcessor,
+			this,
+			0,
+			&m_dwMessageProcessorThreadId);
+	}
+
+	TCDEBUGCLOSE();
+	return PauseProcessing();//RestartProcessing();
+}
+
+BOOL CConnectionImpl::PauseProcessing()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::PauseProcessing\n");
+
+	// tells the processing thread to pause
+	if (m_hMessageProcessorThread != NULL)
+	{
+		m_MessageProcessorState = MP_PAUSE;
+
+		m_ExitMessageProcessor = false;
+		m_StartMessageProcessing = false;
+		m_PauseMessageProcessing = true;
+		DWORD waitStatus = ::WaitForSingleObject(m_hMessageProcessorStoppedEvent, MESSAGEPROCESSOR_EVENTWAIT_TIMEOUT);
+		TCDEBUGLOGA1("CConnectionImpl::PauseProcessing waitStatus=%x\n", waitStatus);
+	}
+	
+	TCDEBUGCLOSE();
+	return TRUE;
+}
+
+BOOL CConnectionImpl::RestartProcessing()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::RestartProcessing\n");
+
+	// tell the processing thread to restart
+	if (m_hMessageProcessorThread != NULL)
+	{
+		m_MessageProcessorState = MP_START;
+
+		m_ExitMessageProcessor = false;
+		m_StartMessageProcessing = true;
+		m_PauseMessageProcessing = false;
+		DWORD waitStatus = ::WaitForSingleObject(m_hMessageProcessorStartedEvent, MESSAGEPROCESSOR_EVENTWAIT_TIMEOUT);
+		TCDEBUGLOGA1("CConnectionImpl::RestartProcessing waitStatus=%x\n", waitStatus);
+	}
+	
+	TCDEBUGCLOSE();
+	return TRUE;
+}
+
+BOOL CConnectionImpl::RemoveClientFromRegistry(CClient* client)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::RemoveClientFromRegistry\n");
+	TCDEBUGCLOSE();
+
+	return m_Registry->RemoveClient(client);
+}
+
+BOOL CConnectionImpl::AddClientToRegistry(CClient* client, long numberIds, BYTE* ids)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::AddClientToRegistry\n");
+	TCDEBUGCLOSE();
+
+	return m_Registry->AddClient(client, numberIds, ids);
+}
+
+void CConnectionImpl::NotifyClientsCommError(long tcfError, bool passOsError, DWORD osError)
+{
+//	TCDEBUGOPEN();
+//	TCDEBUGLOGS("CConnectionImpl::NotifyClientsCommError\n");
+//	TCDEBUGCLOSE();
+
+	if (m_ClientList->size() != 0)
+	{
+		ClientList::iterator iter;
+		for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
+		{
+			CErrorMonitor* errorMonitor = (*iter)->m_ErrorMonitor;
+			errorMonitor->PutError(tcfError, passOsError, osError);
+		}
+	}
+}
+BOOL CConnectionImpl::HasVersion()
+{
+	BOOL found = FALSE;
+
+	if (m_BaseComm && m_BaseComm->HasVersion())
+		found = TRUE;
+
+	return found;
+}
+void CConnectionImpl::GetVersion(char* version)
+{
+	if (HasVersion()) {
+		m_BaseComm->GetVersion(version);
+	}
+}
+
+void CConnectionImpl::UnLockAllDestinations()
+{
+	if (m_ClientList->size() != 0)
+	{
+		ClientList::iterator iter;
+		for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
+		{
+			CInputStream* inputStream = (*iter)->m_InputStream;
+			CMessageFile* file = (*iter)->m_MessageFile;
+			if (inputStream != NULL) 
+			{
+				inputStream->UnLockStream();
+			}
+			else if (file != NULL)
+			{
+				file->UnLockMessageFile();
+			}
+		}
+	}
+}
+
+#define LOG_MPROCESSOR
+#if defined(LOG_MPROCESSOR) && defined(_DEBUG)
+#define MPLOGOPEN() if (gDoLogging) { pThis->m_DebugLog2->WaitForAccess(); }
+#define MPLOGS(s) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2,"%s", s); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
+#define MPLOGA1(s, a1) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2, s, a1); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
+#define MPLOGA2(s, a1, a2) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2, s, a1, a2); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
+#define MPLOGA3(s, a1, a2, a3) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2, s, a1, a2, a3); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
+#define MPLOGCLOSE() if (gDoLogging) { pThis->m_DebugLog2->ReleaseAccess(); }
+#else
+#define MPLOGOPEN()
+#define MPLOGS(s)
+#define MPLOGA1(s, a1)
+#define MPLOGA2(s, a1, a2)
+#define MPLOGA3(s, a1, a2, a3)
+#define MPLOGCLOSE()
+#endif
+
+DWORD WINAPI CConnectionImpl::MessageProcessor(LPVOID lpParam)
+{
+	CConnectionImpl* pThis = (CConnectionImpl*)lpParam;
+
+	MPLOGOPEN();
+	MPLOGS("MessageProcessor start thread\n");
+
+	bool processing = false;
+	long err = TCAPI_ERR_NONE;
+	DWORD pollSize = 0;
+
+	while (pThis->m_MessageProcessorState != MP_EXIT)
+	{
+		if (pThis->m_MessageProcessorState == MP_PAUSE)
+		{
+			MPLOGS("MessageProcessor pause\n");
+
+			processing = false; 
+			pThis->m_PauseMessageProcessing = false;
+			pThis->m_MessageProcessorState = MP_NONE;
+			BOOL ok = ::SetEvent(pThis->m_hMessageProcessorStoppedEvent);
+		}
+
+		if (pThis->IsRetryInProgress())
+			err = pThis->DoRetryProcessing();
+		else if (pThis->IsRetryTimedOut())
+			err = TCAPI_ERR_COMM_TIMEOUT;
+
+		if (processing && err == TCAPI_ERR_NONE)
+		{
+			if (pThis->m_BaseComm && pThis->m_BaseComm->IsConnected())
+			{
+				err = pThis->m_BaseComm->PollPort(pollSize);
+				MPLOGA2("MessageProcessor PollPort = %d pollsize = %d\n", err, pollSize);
+				if (err != TCAPI_ERR_NONE)
+				{
+					MPLOGA2("MessageProcessor  err = %d osError = %d\n", err, pThis->m_BaseComm->m_lastCommError);
+					pThis->EnterRetryPeriod(err, true, pThis->m_BaseComm->m_lastCommError);
+				}
+				else
+				{
+					if (pollSize == 0)
+					{
+						Sleep(1);
+					}
+					else
+					{
+						long numberProcessed = 0;
+//						MPLOGA1("MessageProcessor ProcessBuffer pRegistry = %x\n", pThis->m_Registry);
+						err = pThis->m_BaseComm->ProcessBuffer(pThis, pThis->m_Registry, numberProcessed);
+
+						MPLOGA2("MessageProcessor ProcessBuffer err = %d number = %d\n", err, numberProcessed);
+
+						if (err == TCAPI_ERR_COMM_ERROR)
+						{
+							// for this error we have os error, but we probably caught this in PollPort already
+							pThis->EnterRetryPeriod(err, true, pThis->m_BaseComm->m_lastCommError);
+						}
+						else if (err != TCAPI_ERR_NONE)
+						{
+							// all clients already notified in ProcessBuffer
+							err = TCAPI_ERR_NONE;
+						}
+						pThis->UnLockAllDestinations(); // unlock all input streams, if they became locked during AddMessage()
+//						Sleep(1);
+					}
+				}
+//				MPLOGS("MessageProcessor FlushAllClientMessageFiles\n");
+				pThis->FlushAllClientMessageFiles();
+			}
+			else
+			{
+				// basecom not connected
+				Sleep(1);
+			}
+		}
+		else
+		{
+			// processing is not being done
+			Sleep(1);
+		}
+		if (pThis->m_MessageProcessorState == MP_START)
+		{
+			MPLOGS("MessageProcessor start\n");
+
+			processing = true;
+			pThis->m_StartMessageProcessing = false;
+			pThis->m_MessageProcessorState = MP_PROCESSING;
+			BOOL ok = ::SetEvent(pThis->m_hMessageProcessorStartedEvent);
+		}
+	}
+	// signal we're stopping
+	pThis->m_ExitMessageProcessor = false;
+	pThis->m_MessageProcessorState = MP_NONE;
+	::SetEvent(pThis->m_hMessageProcessorExittedEvent);
+
+	MPLOGS("MessageProcessor exit thread\n");
+	MPLOGCLOSE();
+
+	return 0;
+}
+
+void CConnectionImpl::FlushAllClientMessageFiles()
+{
+	DWORD cTick = GetTickCount();
+
+//	MPLOGA2("CConnectionImpl::FlushAllClientMessageFiles cTick=%d m_NextFlushFileTime=%d\n", cTick, m_NextFlushFileTime);
+
+	if (cTick > m_NextFlushFileTime)
+	{
+//	MPLOGS("CConnectionImpl::FlushAllClientMessageFiles flush timeout\n");
+		if (m_ClientList->size() != 0)
+		{
+			ClientList::iterator iter;
+			for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
+			{
+				CMessageFile* file = (*iter)->m_MessageFile;
+				if (file != NULL)
+				{
+//	MPLOGS("CConnectionImpl::FlushAllClientMessageFiles flush client\n");
+					file->FlushFile();
+				}
+			}
+		}
+		m_NextFlushFileTime = GetTickCount() + FLUSH_TIME;		
+	}
+}
+
+BOOL CConnectionImpl::CreateCommProtocols(const char* commPath, const char* protPath)
+{
+	BOOL loaded = FALSE;
+
+	TCDEBUGOPEN();
+	TCDEBUGLOGS("CConnectionImpl::CreateCommProtocols\n");
+
+	TCDEBUGLOGA2(" commPath=%s protPath=%s\n", commPath, protPath);
+
+	m_BaseCommHandle = ::LoadLibrary(commPath);
+	m_BaseProtocolHandle = ::LoadLibrary(protPath);
+	if (m_BaseCommHandle == NULL || m_BaseProtocolHandle == NULL)
+	{
+		TCDEBUGLOGA2(" error loading library, m_BaseCommHandle=%x m_BaseProtocolHandle=%x\n", m_BaseCommHandle, m_BaseProtocolHandle);
+		if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
+		if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;
+
+	}
+	else
+	{
+		COMMCREATE lpCommFn = (COMMCREATE)::GetProcAddress(m_BaseCommHandle, COMMCREATE_FNNAME);
+		PROTOCOLCREATE lpProtFn = (PROTOCOLCREATE)::GetProcAddress(m_BaseProtocolHandle, PROTOCOLCREATE_FNNAME);
+		if (lpCommFn == NULL || lpProtFn == NULL)
+		{
+			TCDEBUGLOGA2(" error finding function, lpCommFn=%x lpProtFn=%x\n", lpCommFn, lpProtFn);
+			if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
+			if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;
+		}
+		else
+		{
+			m_BaseProtocol = lpProtFn();
+			if (m_BaseProtocol == NULL)
+			{
+				TCDEBUGLOGA1(" error creating protocol, m_BaseProtocol=%x\n", m_BaseProtocol);
+				if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
+				if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;
+			}
+			else
+			{
+				m_BaseComm = lpCommFn(m_ConnectSettings, m_ConnectionID, m_BaseProtocol);
+				if (m_BaseComm == NULL)
+				{
+					TCDEBUGLOGA1(" error creating comm, m_BaseComm=%x\n", m_BaseComm);
+					if (m_BaseProtocol) delete m_BaseProtocol; m_BaseProtocol = NULL;
+
+					if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
+					if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;
+				}
+				else
+				{
+					loaded = TRUE;
+					TCDEBUGLOGA4(" created class, m_BaseComm=%x m_BaseProtocol=%x m_BaseCommHandle=%x m_BaseProtocolHandle=%x\n", m_BaseComm, m_BaseProtocol, m_BaseCommHandle, m_BaseProtocolHandle);
+				}
+			}
+		}
+	}
+
+	TCDEBUGCLOSE();
+	return loaded;
+}