connectivity/com.nokia.tcf/native/TCFNative/TCFServer/ConnectionImpl.cpp
author fturovic <frank.turovich@nokia.com>
Wed, 16 Jun 2010 11:39:35 -0500
changeset 1475 fb0e02cb252b
parent 1473 6c45b7c9cdac
child 1481 c7f22cc57d44
permissions -rw-r--r--
minor edits and updated images

/*
* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
* All rights reserved.
* This component and the accompanying materials are made available
* under the terms of the License "Eclipse Public License v1.0"
* which accompanies this distribution, and is available
* at the URL "http://www.eclipse.org/legal/epl-v10.html".
*
* Initial Contributors:
* Nokia Corporation - initial contribution.
*
* Contributors:
*
* Description: 
*
*/
// ConnectionImpl.cpp: implementation of the CConnectionImpl class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "ConnectionImpl.h"
#include "RegistryImpl.h"
#include "ServerManager.h"

#ifdef _DEBUG
extern BOOL gDoLogging;
#endif

#define LOG_CONNECTION
#if defined(LOG_CONNECTION) && defined(_DEBUG)
#define TCDEBUGOPEN() if (gDoLogging) { this->m_DebugLog->WaitForAccess(); }
#define TCDEBUGLOGS(s) if (gDoLogging) { sprintf(this->m_DebugLogMsg,"%s", s); this->m_DebugLog->log(this->m_DebugLogMsg); }
#define TCDEBUGLOGA1(s, a1) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1); this->m_DebugLog->log(this->m_DebugLogMsg); }
#define TCDEBUGLOGA2(s, a1, a2) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1, a2); this->m_DebugLog->log(this->m_DebugLogMsg); }
#define TCDEBUGLOGA3(s, a1, a2, a3) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1, a2, a3); this->m_DebugLog->log(this->m_DebugLogMsg); }
#define TCDEBUGLOGA4(s, a1, a2, a3, a4) if (gDoLogging) { sprintf(this->m_DebugLogMsg, s, a1, a2, a3, a4); this->m_DebugLog->log(this->m_DebugLogMsg); }
#define TCDEBUGCLOSE() if (gDoLogging) { this->m_DebugLog->ReleaseAccess(); }
#else
#define TCDEBUGOPEN()
#define TCDEBUGLOGS(s)
#define TCDEBUGLOGA1(s, a1)
#define TCDEBUGLOGA2(s, a1, a2)
#define TCDEBUGLOGA3(s, a1, a2, a3)
#define TCDEBUGLOGA4(s, a1, a2, a3, a4)
#define TCDEBUGCLOSE()
#endif

//#define LOG_MPROCESSOR

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CConnectionImpl::CConnectionImpl()
{
	m_ConnectSettings = NULL;
	m_ClientList = NULL;
	m_Status = eDisconnected;
	m_Registry = NULL;
	m_ConnectionID = 0;
	m_OsError = 0;
	m_BaseComm = NULL;
	m_BaseProtocol = NULL;
	m_BaseCommHandle = NULL;
	m_BaseProtocolHandle = NULL;

	// message processing thread flags and handles
	m_MessageProcessorState = MP_NONE;
	m_ExitMessageProcessor = false;
	m_PauseMessageProcessing = false;
	m_StartMessageProcessing = false;
	m_hMessageProcessorExittedEvent = NULL;
	m_hMessageProcessorStoppedEvent = NULL;
	m_hMessageProcessorStartedEvent = NULL;
	m_hMessageProcessorThread = NULL;
	m_dwMessageProcessorThreadId = 0;

	m_NextRetryTime = m_RetryTimeoutTime = 0;

	m_NextFlushFileTime = 0;
}

CConnectionImpl::CConnectionImpl(ConnectData conData, DWORD connectionId)
{
#ifdef _DEBUG
	if (gDoLogging)
	{
#if defined(LOG_CONNECTION)
		m_DebugLog = new TCDebugLog("TCF_ConnectionLog", connectionId);
#else
		m_DebugLog = NULL;
#endif
#if defined(LOG_MPROCESSOR)
		m_DebugLog2 = new TCDebugLog("TCF_ProcessorLog", connectionId);
#else
		m_DebugLog2 = NULL;
#endif
	}
	else
	{
		m_DebugLog = NULL;
		m_DebugLog2 = NULL;
	}
#else
	m_DebugLog = NULL;
	m_DebugLog2 = NULL;
#endif

	TCDEBUGOPEN();
	TCDEBUGLOGA1("CConnectionImpl::CConnectionImpl id = %d\n", connectionId);

	m_ConnectSettings = new ConnectData();

	memcpy(m_ConnectSettings, &conData, sizeof(ConnectData));

	m_ClientList = new ClientList();
	m_ClientList->clear();
	m_Status = eDisconnected;
	m_Registry = new CRegistryImpl(connectionId);
	m_ConnectionID = connectionId;
	m_BaseComm = NULL;
	m_BaseProtocol = NULL;
	m_BaseCommHandle = NULL;
	m_BaseProtocolHandle = NULL;

	// message processing thread flags and handles
	m_MessageProcessorState = MP_NONE;
	m_ExitMessageProcessor = false;
	m_PauseMessageProcessing = false;
	m_StartMessageProcessing = false;

	// create named events
	char eventName[100];

	sprintf(eventName, "%s%d", MESSAGEPROCESSOR_EXITEVENT_BASENAME, connectionId);
	m_hMessageProcessorExittedEvent = ::CreateEvent(NULL, FALSE, FALSE, eventName);

	sprintf(eventName, "%s%d", MESSAGEPROCESSOR_STOPEVENT_BASENAME, connectionId);
	m_hMessageProcessorStoppedEvent = ::CreateEvent(NULL, FALSE, FALSE, eventName);

	sprintf(eventName, "%s%d", MESSAGEPROCESSOR_STARTEVENT_BASENAME, connectionId);
	m_hMessageProcessorStartedEvent = ::CreateEvent(NULL, FALSE, FALSE, eventName);

	m_hMessageProcessorThread = NULL;
	m_dwMessageProcessorThreadId = 0;

	m_NextRetryTime = m_RetryTimeoutTime = 0;

	m_NextFlushFileTime = 0;
	m_OsError = 0;

	TCDEBUGCLOSE();
}
CConnectionImpl::~CConnectionImpl()
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::~CConnectionImpl\n");

	// terminate the message processor thread if running

	if (m_hMessageProcessorThread != NULL)
	{
		BOOL t = ::TerminateThread(m_hMessageProcessorThread, 0);
		::CloseHandle(m_hMessageProcessorThread);
	}

	if (m_hMessageProcessorExittedEvent != NULL)
	{
		::CloseHandle(m_hMessageProcessorExittedEvent);
	}

	if (m_hMessageProcessorStoppedEvent != NULL)
	{
		::CloseHandle(m_hMessageProcessorStoppedEvent);
	}

	if (m_hMessageProcessorStartedEvent != NULL)
	{
		::CloseHandle(m_hMessageProcessorStartedEvent);
	}

	if (m_ConnectSettings)
		delete m_ConnectSettings;


	if (m_ClientList)
	{
		m_ClientList->clear();
		delete m_ClientList;
	}

	if (m_Registry)
	{
		delete m_Registry;
	}

	if (m_BaseComm)
	{
		delete m_BaseComm;
	}

	if (m_BaseCommHandle)
	{
		::FreeLibrary(m_BaseCommHandle);
	}
	if (m_BaseProtocol)
	{
		delete m_BaseProtocol;
	}

	if (m_BaseProtocolHandle)
	{
		::FreeLibrary(m_BaseProtocolHandle);
	}

	TCDEBUGCLOSE();
	if (m_DebugLog)
		delete m_DebugLog;
	if (m_DebugLog2)
		delete m_DebugLog2;

}

BOOL CConnectionImpl::IsEqual(CConnection* connection)
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::IsEqual\n");

	BOOL equal = FALSE;

	if (strcmp(m_ConnectSettings->connectType, connection->m_ConnectSettings->connectType) == 0)
	{
		if (m_BaseComm)
		{
			if (m_BaseComm->IsConnectionEqual(connection->m_ConnectSettings))
			{
				equal = TRUE;
			}
		}
		else
		{
			equal = TRUE;
		}
	}

	TCDEBUGCLOSE();
	return equal;
}

BOOL CConnectionImpl::IsEqual(pConnectData pConData)
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::IsEqual\n");

	BOOL equal = FALSE;

	if (strcmp(m_ConnectSettings->connectType, pConData->connectType) == 0)
	{
		if (m_BaseComm)
		{
			if (m_BaseComm->IsConnectionEqual(pConData))
			{
				equal = TRUE;
			}
		}
		else
		{
			equal = TRUE;
		}
	}
	TCDEBUGCLOSE();
	return equal;
}

long CConnectionImpl::DoConnect()
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::DoConnect\n");

	long ret = TCAPI_ERR_NONE;

	if (m_BaseComm && m_BaseProtocol)
	{
		ret = m_BaseComm->OpenPort();
		if (ret != TCAPI_ERR_NONE)
		{
			m_OsError = m_BaseComm->m_lastCommError;
			TCDEBUGLOGA1(" m_BaseComm->OpenPort = %d\n", ret);
		}
	}
	else
	{
		ret = TCAPI_ERR_UNKNOWN_MEDIA_TYPE;
	}

	if (ret == TCAPI_ERR_NONE)
	{
		m_Status = eConnected;

		TCDEBUGCLOSE();

		StartProcessing();
	}
	else
	{
//		if (m_BaseComm != NULL)
//		{
//			delete m_BaseComm;
//			m_BaseComm = NULL;
//		}
		TCDEBUGCLOSE();
	}
	return ret;
}

long CConnectionImpl::DoDisconnect()
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::DoDisconnect\n");

	long ret = TCAPI_ERR_NONE;
	if (IsConnected())
	{
		ret = m_BaseComm->ClosePort();
//		delete m_BaseComm;
//		m_BaseComm = NULL;
	}
	m_Status = eDisconnected;

	TCDEBUGCLOSE();
	return ret;
}

BOOL CConnectionImpl::AddClient(CClient* client)
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::AddClient\n");

	BOOL ok = TRUE;

	m_ClientList->push_back(client);

	TCDEBUGCLOSE();
	return ok;
}

long CConnectionImpl::DoSendMessage(long encodeOption, BYTE protocolVersion, BOOL useMsgId, BYTE msgId, DWORD msgLength, BYTE* pMsg)
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::DoSendMessage\n");

	long err = TCAPI_ERR_NONE;
	if (IsRetryInProgress())
	{
		err = TCAPI_ERR_COMM_RETRY_IN_PROGRESS;
	}
	else if (IsRetryTimedOut())
	{
		err = TCAPI_ERR_COMM_TIMEOUT;
	}
	else if (m_Status == eConnected)
	{
		BYTE* encodedMessage = new BYTE[msgLength + 40]; // add enough for header (msgLength may be 0)
		// if msgLength == 0, then encodeOption SHOULD be ENCODE_FORMAT since com expects to send something!
		if (encodeOption == ENCODE_FORMAT)
		{
#ifdef _DEBUG
			char msg[200]; msg[0] = '\0';
			int len = (msgLength > 30) ? 30 : msgLength;
			for (int i = 0; i < len; i ++)
			{
				sprintf(msg, "%s%02.2x ", msg, pMsg[i]);
			}
			sprintf(msg, "%s\n", msg);
			TCDEBUGLOGS(msg);
#endif
			// msgLength maybe 0 and pMsg maybe NULL (we're not sending a raw message, just a protocol header)
			msgLength = m_BaseProtocol->EncodeMessage(pMsg, msgLength, protocolVersion, msgId, encodedMessage, msgLength+40);
#ifdef _DEBUG
			msg[0] = '\0';
			len = (msgLength > 30) ? 30 : msgLength;
			for (i = 0; i < len; i ++)
			{
				sprintf(msg, "%s%02.2x ", msg, encodedMessage[i]);
			}
			sprintf(msg, "%s\n", msg);
			TCDEBUGLOGS(msg);
#endif
			err = m_BaseComm->SendDataToPort(msgLength, encodedMessage);
		}
		else
		{
#ifdef _DEBUG
			char msg[200]; msg[0] = '\0';
			int len = (msgLength > 30) ? 30 : msgLength;
			for (int i = 0; i < len; i ++)
			{
				sprintf(msg, "%s%02.2x ", msg, pMsg[i]);
			}
			sprintf(msg, "%s\n", msg);
			TCDEBUGLOGS(msg);
#endif
			// msgLength != 0 and pMsg != NULL
			err = m_BaseComm->SendDataToPort(msgLength, pMsg);
		}
		delete[] encodedMessage;

		TCDEBUGLOGS("CConnectionImpl::DoSendMessage done\n");
		if (err == TCAPI_ERR_COMM_ERROR)
		{
//			EnterRetryPeriod(err, true, m_BaseComm->m_lastCommError);
			HandleFatalPortError(err, true, m_BaseComm->m_lastCommError);
			m_OsError = m_BaseComm->m_lastCommError;
		}
	}
	else
	{
		err = TCAPI_ERR_MEDIA_NOT_OPEN;
	}

	TCDEBUGLOGA1("CConnectionImpl::DoSendMessage err = %d\n", err);
	TCDEBUGCLOSE();
	return err;
}

long CConnectionImpl::DoRetryProcessing()
{
	long err = TCAPI_ERR_NONE;

	// if not connected
	//   return no error
	if (m_BaseComm == NULL /*|| m_BaseComm->IsConnected() == false*/)
		return TCAPI_ERR_MEDIA_NOT_OPEN;

	// if retry not in progress && retry not timed out
	//   return no error
	if (!IsRetryInProgress() && !IsRetryTimedOut())
		return TCAPI_ERR_NONE;

//	TCDEBUGOPEN();
//	TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing\n");
//	TCDEBUGCLOSE();
	// if retry timeout flag already set
	//   return timeout error
	if (IsRetryTimedOut())
		return TCAPI_ERR_COMM_TIMEOUT;

	// get current time
	time_t ctime;
	time(&ctime);
	// if retry timeout period has expired
	if (ctime >= m_RetryTimeoutTime)
	{
		TCDEBUGOPEN();
		TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing retry timeout\n");
		TCDEBUGCLOSE();
		// send timeout error to all clients
		NotifyClientsCommError(TCAPI_ERR_COMM_TIMEOUT);
		// close comm port
		m_BaseComm->ClosePort();
		// set retry timeout flag
		SetRetryTimedOut();
		// return retry timeout error
		err = TCAPI_ERR_COMM_TIMEOUT;
	}
	// else if retry time has passed
	else if (ctime >= m_NextRetryTime)
	{
		TCDEBUGOPEN();
		TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing retry time\n");
		TCDEBUGCLOSE();
		// close comm port
		// reopen comm port
		m_BaseComm->ClosePort();
		int openErr = m_BaseComm->OpenPort();
		// if comm error
		if (openErr != TCAPI_ERR_NONE)
		{
			// set next retry time
			// return comm error
			m_NextRetryTime = ctime + m_ConnectSettings->retryInterval;
			err = TCAPI_ERR_COMM_RETRY_IN_PROGRESS;
			m_OsError = m_BaseComm->m_lastCommError;
		}
		else
		{
		TCDEBUGOPEN();
		TCDEBUGLOGS("CConnectionImpl::DoRetryProcessing reconnected\n");
		TCDEBUGCLOSE();
			// send reconnect warning to all clients
			NotifyClientsCommError(TCAPI_INFO_COMM_RECONNECTED);
			// set connected
			SetConnected();
			err = TCAPI_ERR_NONE;
		}
	} 
	else // still in retry
	{
		err = TCAPI_ERR_COMM_RETRY_IN_PROGRESS;
	}


//	TCDEBUGOPEN();
//	TCDEBUGLOGA1("CConnectionImpl::DoRetryProcessing err = %d\n", err);
//	TCDEBUGCLOSE();
	return err;
}
long CConnectionImpl::EnterRetryPeriod(long commErr, bool passOsErr, DWORD osErr)
{
	TCDEBUGOPEN();
	TCDEBUGLOGA3("CConnectionImpl::EnterRetryPeriod commErr=%d passOsErr=%d osErr=%d\n", commErr, passOsErr, osErr);
	TCDEBUGCLOSE();

	long err = TCAPI_ERR_NONE;

	// set next retry time
	time_t ctime;
	time(&ctime);
	m_NextRetryTime = ctime + m_ConnectSettings->retryInterval;
	// set retry timeout time
	m_RetryTimeoutTime = ctime + m_ConnectSettings->retryTimeout;
	// send comm error to all clients
	NotifyClientsCommError(commErr, passOsErr, osErr);
	// set retry in progress flag
	SetRetryInProgress();

	return err;
}

BOOL CConnectionImpl::RemoveClient(CClient* client)
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::RemoveClient\n");

	BOOL found = FALSE;

	if (m_ClientList->size() != 0)
	{
		ClientList::iterator iter;
		for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
		{
			if ((*iter)->GetClientId() == client->GetClientId())
			{
				m_ClientList->erase(iter);
				found = TRUE;
				break;
			}
		}
	}

	TCDEBUGCLOSE();
	return found;
}

BOOL CConnectionImpl::ExitProcessing()
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::ExitProcessing\n");

	// exit the messageprocessing thread
	if (m_hMessageProcessorThread != NULL)
	{
		m_MessageProcessorState = MP_EXIT;

		m_StartMessageProcessing = false;
		m_PauseMessageProcessing = true;
		m_ExitMessageProcessor = true;
		DWORD waitStatus = ::WaitForSingleObject(m_hMessageProcessorExittedEvent, MESSAGEPROCESSOR_EVENTWAIT_TIMEOUT);
		TCDEBUGLOGA1("CConnectionImpl::ExitProcessing waitStatus=%x\n", waitStatus);
		::CloseHandle(m_hMessageProcessorThread);
		m_hMessageProcessorThread = NULL;
	}
	
	TCDEBUGCLOSE();
	return TRUE;
}

BOOL CConnectionImpl::StartProcessing()
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::StartProcessing\n");

	// starts processing thread
	if (m_hMessageProcessorThread == NULL)
	{
		m_MessageProcessorState = MP_PAUSE;

		m_ExitMessageProcessor = false;
		m_StartMessageProcessing = false;
		m_PauseMessageProcessing = false;
		// TODO: create thread
		m_hMessageProcessorThread = ::CreateThread(
			NULL,
			0,
			(LPTHREAD_START_ROUTINE) MessageProcessor,
			this,
			0,
			&m_dwMessageProcessorThreadId);
	}

	TCDEBUGCLOSE();
	return PauseProcessing();//RestartProcessing();
}

BOOL CConnectionImpl::PauseProcessing()
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::PauseProcessing\n");

	// tells the processing thread to pause
	if (m_hMessageProcessorThread != NULL)
	{
		m_MessageProcessorState = MP_PAUSE;

		m_ExitMessageProcessor = false;
		m_StartMessageProcessing = false;
		m_PauseMessageProcessing = true;
		DWORD waitStatus = ::WaitForSingleObject(m_hMessageProcessorStoppedEvent, MESSAGEPROCESSOR_EVENTWAIT_TIMEOUT);
		TCDEBUGLOGA1("CConnectionImpl::PauseProcessing waitStatus=%x\n", waitStatus);
	}
	
	TCDEBUGCLOSE();
	return TRUE;
}

BOOL CConnectionImpl::RestartProcessing()
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::RestartProcessing\n");

	// tell the processing thread to restart
	if (m_hMessageProcessorThread != NULL)
	{
		m_MessageProcessorState = MP_START;

		m_ExitMessageProcessor = false;
		m_StartMessageProcessing = true;
		m_PauseMessageProcessing = false;
		DWORD waitStatus = ::WaitForSingleObject(m_hMessageProcessorStartedEvent, MESSAGEPROCESSOR_EVENTWAIT_TIMEOUT);
		TCDEBUGLOGA1("CConnectionImpl::RestartProcessing waitStatus=%x\n", waitStatus);
	}
	
	TCDEBUGCLOSE();
	return TRUE;
}

BOOL CConnectionImpl::RemoveClientFromRegistry(CClient* client)
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::RemoveClientFromRegistry\n");
	TCDEBUGCLOSE();

	return m_Registry->RemoveClient(client);
}

BOOL CConnectionImpl::AddClientToRegistry(CClient* client, long numberIds, BYTE* ids)
{
	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::AddClientToRegistry\n");
	TCDEBUGCLOSE();

	return m_Registry->AddClient(client, numberIds, ids);
}

long CConnectionImpl::HandleFatalPortError(long err, bool passOsErr, DWORD osErr)
{
	TCDEBUGOPEN();
	TCDEBUGLOGA3("CConnectionImpl::HandleFatalPortError err=%d passOsErr=%d osErr=%d\n", err, passOsErr, osErr);
	TCDEBUGCLOSE();

	m_BaseComm->ClosePort();
	m_Status = eDisconnected;

	NotifyClientsCommError(err);

	return TCAPI_ERR_NONE;
}
void CConnectionImpl::NotifyClientsCommError(long tcfError, bool passOsError, DWORD osError)
{
//	TCDEBUGOPEN();
//	TCDEBUGLOGS("CConnectionImpl::NotifyClientsCommError\n");
//	TCDEBUGCLOSE();

	if (m_ClientList->size() != 0)
	{
		ClientList::iterator iter;
		for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
		{
			CErrorMonitor* errorMonitor = (*iter)->m_ErrorMonitor;
			errorMonitor->PutError(tcfError, passOsError, osError);
		}
	}
}
BOOL CConnectionImpl::HasVersion()
{
	BOOL found = FALSE;

	if (m_BaseComm && m_BaseComm->HasVersion())
		found = TRUE;

	return found;
}
void CConnectionImpl::GetVersion(char* version)
{
	if (HasVersion()) {
		m_BaseComm->GetVersion(version);
	}
}

void CConnectionImpl::UnLockAllDestinations()
{
	if (m_ClientList->size() != 0)
	{
		ClientList::iterator iter;
		for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
		{
			CInputStream* inputStream = (*iter)->m_InputStream;
			CMessageFile* file = (*iter)->m_MessageFile;
			if (inputStream != NULL) 
			{
				inputStream->UnLockStream();
			}
			else if (file != NULL)
			{
				file->UnLockMessageFile();
			}
		}
	}
}

#if defined(LOG_MPROCESSOR) && defined(_DEBUG)
#define MPLOGOPEN() if (gDoLogging) { pThis->m_DebugLog2->WaitForAccess(); }
#define MPLOGS(s) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2,"%s", s); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
#define MPLOGA1(s, a1) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2, s, a1); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
#define MPLOGA2(s, a1, a2) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2, s, a1, a2); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
#define MPLOGA3(s, a1, a2, a3) if (gDoLogging) { sprintf(pThis->m_DebugLogMsg2, s, a1, a2, a3); pThis->m_DebugLog2->log(pThis->m_DebugLogMsg2); }
#define MPLOGCLOSE() if (gDoLogging) { pThis->m_DebugLog2->ReleaseAccess(); }
#else
#define MPLOGOPEN()
#define MPLOGS(s)
#define MPLOGA1(s, a1)
#define MPLOGA2(s, a1, a2)
#define MPLOGA3(s, a1, a2, a3)
#define MPLOGCLOSE()
#endif

DWORD WINAPI CConnectionImpl::MessageProcessor(LPVOID lpParam)
{
	CConnectionImpl* pThis = (CConnectionImpl*)lpParam;

	MPLOGOPEN();
	MPLOGS("MessageProcessor start thread\n");

	bool processing = false;
	long err = TCAPI_ERR_NONE;
	DWORD pollSize = 0;

	while (pThis->m_MessageProcessorState != MP_EXIT)
	{
		if (pThis->m_MessageProcessorState == MP_PAUSE)
		{
			MPLOGS("MessageProcessor pause\n");

			processing = false; 
			pThis->m_PauseMessageProcessing = false;
			pThis->m_MessageProcessorState = MP_NONE;
			BOOL ok = ::SetEvent(pThis->m_hMessageProcessorStoppedEvent);
		}

		if (pThis->IsRetryInProgress())
			err = pThis->DoRetryProcessing();
		else if (pThis->IsRetryTimedOut())
			err = TCAPI_ERR_COMM_TIMEOUT;

		if (processing && err == TCAPI_ERR_NONE)
		{
			if (pThis->m_BaseComm && pThis->m_BaseComm->IsConnected())
			{
				err = pThis->m_BaseComm->PollPort(pollSize);
				MPLOGA2("MessageProcessor PollPort = %d pollsize = %d\n", err, pollSize);
				if (err != TCAPI_ERR_NONE)
				{
					MPLOGA2("MessageProcessor  err = %d osError = %d\n", err, pThis->m_BaseComm->m_lastCommError);
//					pThis->EnterRetryPeriod(err, true, pThis->m_BaseComm->m_lastCommError);
					pThis->HandleFatalPortError(err, true, pThis->m_BaseComm->m_lastCommError);
				}
				else
				{
					if (pollSize == 0)
					{
						Sleep(1);
					}
					else
					{
						long numberProcessed = 0;
//						MPLOGA1("MessageProcessor ProcessBuffer pRegistry = %x\n", pThis->m_Registry);
						err = pThis->m_BaseComm->ProcessBuffer(pThis, pThis->m_Registry, numberProcessed);

						MPLOGA2("MessageProcessor ProcessBuffer err = %d number = %d\n", err, numberProcessed);

						if (err == TCAPI_ERR_COMM_ERROR)
						{
							MPLOGA2("MessageProcessor  err = %d osError = %d\n", err, pThis->m_BaseComm->m_lastCommError);
							// for this error we have os error, but we probably caught this in PollPort already
//							pThis->EnterRetryPeriod(err, true, pThis->m_BaseComm->m_lastCommError);
							pThis->HandleFatalPortError(err, true, pThis->m_BaseComm->m_lastCommError);
						}
						else if (err != TCAPI_ERR_NONE)
						{
							// all clients already notified in ProcessBuffer
							err = TCAPI_ERR_NONE;
						}
						pThis->UnLockAllDestinations(); // unlock all input streams, if they became locked during AddMessage()
//						Sleep(1);
					}
				}
//				MPLOGS("MessageProcessor FlushAllClientMessageFiles\n");
				pThis->FlushAllClientMessageFiles();
			}
			else
			{
				// basecom not connected
				Sleep(1);
			}
		}
		else
		{
			// processing is not being done
			Sleep(1);
		}
		if (pThis->m_MessageProcessorState == MP_START)
		{
			MPLOGS("MessageProcessor start\n");

			processing = true;
			pThis->m_StartMessageProcessing = false;
			pThis->m_MessageProcessorState = MP_PROCESSING;
			BOOL ok = ::SetEvent(pThis->m_hMessageProcessorStartedEvent);
		}
	}
	// signal we're stopping
	pThis->m_ExitMessageProcessor = false;
	pThis->m_MessageProcessorState = MP_NONE;
	::SetEvent(pThis->m_hMessageProcessorExittedEvent);

	MPLOGS("MessageProcessor exit thread\n");
	MPLOGCLOSE();

	return 0;
}

void CConnectionImpl::FlushAllClientMessageFiles()
{
	DWORD cTick = GetTickCount();

//	MPLOGA2("CConnectionImpl::FlushAllClientMessageFiles cTick=%d m_NextFlushFileTime=%d\n", cTick, m_NextFlushFileTime);

	if (cTick > m_NextFlushFileTime)
	{
//	MPLOGS("CConnectionImpl::FlushAllClientMessageFiles flush timeout\n");
		if (m_ClientList->size() != 0)
		{
			ClientList::iterator iter;
			for (iter = m_ClientList->begin(); iter != m_ClientList->end(); iter++)
			{
				CMessageFile* file = (*iter)->m_MessageFile;
				if (file != NULL)
				{
//	MPLOGS("CConnectionImpl::FlushAllClientMessageFiles flush client\n");
					file->FlushFile();
				}
			}
		}
		m_NextFlushFileTime = GetTickCount() + FLUSH_TIME;		
	}
}

BOOL CConnectionImpl::CreateCommProtocols(const char* commPath, const char* protPath)
{
	BOOL loaded = FALSE;

	TCDEBUGOPEN();
	TCDEBUGLOGS("CConnectionImpl::CreateCommProtocols\n");

	TCDEBUGLOGA2(" commPath=%s protPath=%s\n", commPath, protPath);

	m_BaseCommHandle = ::LoadLibrary(commPath);
	m_BaseProtocolHandle = ::LoadLibrary(protPath);
	if (m_BaseCommHandle == NULL || m_BaseProtocolHandle == NULL)
	{
		TCDEBUGLOGA2(" error loading library, m_BaseCommHandle=%x m_BaseProtocolHandle=%x\n", m_BaseCommHandle, m_BaseProtocolHandle);
		if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
		if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;

	}
	else
	{
		COMMCREATE lpCommFn = (COMMCREATE)::GetProcAddress(m_BaseCommHandle, COMMCREATE_FNNAME);
		PROTOCOLCREATE lpProtFn = (PROTOCOLCREATE)::GetProcAddress(m_BaseProtocolHandle, PROTOCOLCREATE_FNNAME);
		if (lpCommFn == NULL || lpProtFn == NULL)
		{
			TCDEBUGLOGA2(" error finding function, lpCommFn=%x lpProtFn=%x\n", lpCommFn, lpProtFn);
			if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
			if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;
		}
		else
		{
			m_BaseProtocol = lpProtFn();
			if (m_BaseProtocol == NULL)
			{
				TCDEBUGLOGA1(" error creating protocol, m_BaseProtocol=%x\n", m_BaseProtocol);
				if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
				if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;
			}
			else
			{
				m_BaseComm = lpCommFn(m_ConnectSettings, m_ConnectionID, m_BaseProtocol);
				if (m_BaseComm == NULL)
				{
					TCDEBUGLOGA1(" error creating comm, m_BaseComm=%x\n", m_BaseComm);
					if (m_BaseProtocol) delete m_BaseProtocol; m_BaseProtocol = NULL;

					if (m_BaseCommHandle) ::FreeLibrary(m_BaseCommHandle); m_BaseCommHandle = NULL;
					if (m_BaseProtocolHandle) ::FreeLibrary(m_BaseProtocolHandle); m_BaseProtocolHandle = NULL;
				}
				else
				{
					loaded = TRUE;
					TCDEBUGLOGA4(" created class, m_BaseComm=%x m_BaseProtocol=%x m_BaseCommHandle=%x m_BaseProtocolHandle=%x\n", m_BaseComm, m_BaseProtocol, m_BaseCommHandle, m_BaseProtocolHandle);
				}
			}
		}
	}

	TCDEBUGCLOSE();
	return loaded;
}