connectivity/com.nokia.tcf/native/TCFNative/Common/Source/InputStream.cpp
changeset 60 9d2210c8eed2
child 1481 c7f22cc57d44
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/connectivity/com.nokia.tcf/native/TCFNative/Common/Source/InputStream.cpp	Mon Apr 06 15:18:48 2009 -0500
@@ -0,0 +1,1044 @@
+/*
+* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of the License "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description: 
+*
+*/
+
+#include "stdafx.h"
+#include "InputStream.h"
+#include <stdio.h>
+#include <sys/stat.h>
+
+#ifdef TCF_CLIENT
+#include "..\..\TCFClient\ClientManager.h"
+extern CClientManager* gManager;
+#endif
+#ifdef TCF_SERVER
+#include "..\..\TCFServer\ServerManager.h"
+extern CServerManager* gManager;
+//#define LOG_PERFORMANCE
+#endif
+
+#ifdef _DEBUG
+extern BOOL gDoLogging;
+#endif
+
+// for performance - independent of debug logging
+//#define LOG_PERFORMANCE
+#ifdef LOG_PERFORMANCE
+# ifdef TCF_CLIENT
+static char* perfFileName="c:\\tcf\\clientperf.txt";
+# else
+static char* perfFileName="c:\\tcf\\serverperf.txt";
+# endif
+static FILE *fpLog = NULL;
+static int numLogged=0;
+static void logPerf(char* msg);
+static void openPerf();
+static void closePerf();
+#define OPENPERF() openPerf()
+#define LOGPERF(s) logPerf(s)
+#define CLOSEPERF() closePerf()
+#else
+#define OPENPERF()
+#define LOGPERF(s)
+#define CLOSEPERF()
+#endif
+
+//#define LOG_INPUTSTREAM
+#if defined(LOG_INPUTSTREAM) && defined(_DEBUG)
+extern char TCDebugMsg[];
+#define TCDEBUGOPEN() if (gDoLogging) { gManager->m_DebugLog->WaitForAccess(); }
+#define TCDEBUGLOGS(s) if (gDoLogging) { sprintf(TCDebugMsg,"%s", s); gManager->m_DebugLog->log(TCDebugMsg); }
+#define TCDEBUGLOGA1(s, a1) if (gDoLogging) { sprintf(TCDebugMsg, s, a1); gManager->m_DebugLog->log(TCDebugMsg); }
+#define TCDEBUGLOGA2(s, a1, a2) if (gDoLogging) { sprintf(TCDebugMsg, s, a1, a2); gManager->m_DebugLog->log(TCDebugMsg); }
+#define TCDEBUGLOGA3(s, a1, a2, a3) if (gDoLogging) { sprintf(TCDebugMsg, s, a1, a2, a3); gManager->m_DebugLog->log(TCDebugMsg); }
+#define TCDEBUGCLOSE() if (gDoLogging) { gManager->m_DebugLog->ReleaseAccess(); }
+#else
+#define TCDEBUGOPEN()
+#define TCDEBUGLOGS(s)
+#define TCDEBUGLOGA1(s, a1)
+#define TCDEBUGLOGA2(s, a1, a2)
+#define TCDEBUGLOGA3(s, a1, a2, a3)
+#define TCDEBUGCLOSE()
+#endif
+
+CInputStream::CInputStream()
+{
+}
+
+CInputStream::CInputStream(CHAR* pOverflowPath, DWORD inBufferSize, BOOL inOverFlowToFile, long inClientID)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA2("CInputStream::CInputStream clientId = %d this = %x\n", inClientID, this);
+
+	m_ClientID = inClientID;
+	m_BufferSize = inBufferSize;
+	m_FileSize = INPUTSTREAMOVERFLOW_FILE_SIZE;
+//	m_FileSize = 2*1024L;				for testing only
+
+	if (pOverflowPath == NULL)
+	{
+		m_OverFlowBaseName[0] = NULL;
+		m_OverFlowToFile = FALSE;
+	}
+	else
+	{
+		strncpy(m_OverFlowBaseName, pOverflowPath, MAX_FILEPATH);
+		m_OverFlowToFile = inOverFlowToFile;
+	}
+
+	m_File = NULL;
+
+	m_StreamLocked = FALSE;
+
+	TCDEBUGLOGS("CInputStream::CInputStream done\n");
+	TCDEBUGCLOSE();
+}
+
+CInputStream::~CInputStream()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA2("CInputStream::~CInputStream clientId = %d this = %x\n", m_ClientID, this);
+
+	m_Mutex.Close();
+	m_Data.Close();
+	m_Info.Close();
+
+	if (m_File != NULL)
+	{
+		m_File->Close();
+		delete m_File;
+	}
+
+	TCDEBUGLOGS("CInputStream::~CInputStream done\n");
+	TCDEBUGCLOSE();
+
+	CLOSEPERF();
+}
+
+BOOL CInputStream::CreateStream()
+{
+	BOOL ok = TRUE;
+#if (0)
+#ifdef _DEBUGLOG
+	#ifdef TCF_CLIENT
+		m_DebugLog = new TCDebugLog("TCF_ClientStreamLog", m_ClientID);
+	#else
+		m_DebugLog = new TCDebugLog("TCF_ServerStreamLog", m_ClientID);
+	#endif
+#else
+	m_DebugLog = NULL;
+#endif
+#endif
+	TCDEBUGOPEN();
+	TCDEBUGLOGA2("CInputStream::CreateStream clientId = %d this = %x\n", m_ClientID, this);
+
+	char toString[30];
+	sprintf(toString, "%s%04.4d", INPUTSTREAMDATA_MAP_BASENAME, m_ClientID);
+
+	TCDEBUGLOGA1("CInputStream::CreateStream dataname = %s\n", toString);
+
+	m_Data.Open(m_BufferSize, toString);
+	m_Data.Init();
+
+	sprintf(toString, "%s%04.4d", INPUTSTREAMDATA_MUTEX_BASENAME, m_ClientID);
+	TCDEBUGLOGA1("CInputStream::CreateStream mutexname = %s\n", toString);
+
+	m_Mutex.Open(toString, INPUTSTREAMDATA_MUTEX_TIMEOUT);
+
+	sprintf(toString, "%s%04.4d", INPUTSTREAMINFO_MAP_BASENAME, m_ClientID);
+	TCDEBUGLOGA1("CInputStream::CreateStream infoname = %s\n", toString);
+
+	m_Info.Open(INPUTSTREAMINFO_MAP_SIZE, toString);
+	m_Info.Init();
+#ifdef USE_CIRCULAR_BUFFER
+	GetInfoPtr()->bufferCapacity = m_BufferSize;
+#endif
+
+	// overflow file?
+	if (m_OverFlowToFile)
+	{
+		m_File = new CInputStreamFile();
+		m_File->SetClientId(m_ClientID);
+		if (!m_File->Open(m_FileSize, m_OverFlowBaseName))
+		{
+			delete m_File;
+			m_File = NULL;
+			ok = FALSE;
+		}
+	}
+	TCDEBUGLOGS("CInputStream::CreateStream done\n");
+	TCDEBUGCLOSE();
+
+	OPENPERF();
+
+	return ok;
+}
+#ifdef USE_CIRCULAR_BUFFER
+void CInputStream::DoReadBuffer(pInputStreamInfo pInfo, BYTE* pBuffer, BYTE* outData, DWORD inLength)
+{
+	DWORD lenToEnd = pInfo->bufferCapacity - pInfo->bufferRead;
+	if (lenToEnd > inLength)
+		lenToEnd = inLength;
+
+	if (lenToEnd > 0)
+		memcpy(outData, &pBuffer[pInfo->bufferRead], lenToEnd);
+
+	DWORD lenRemaining = inLength - lenToEnd;
+	if (lenRemaining > 0)
+		memcpy(&outData[lenToEnd], &pBuffer[0], lenRemaining);
+
+	pInfo->bufferSize -= inLength;
+
+	IncrementReadPosition(pInfo, inLength);
+}
+void CInputStream::DoWriteBuffer(pInputStreamInfo pInfo, BYTE* pBuffer, BYTE* inData, DWORD inLength)
+{
+	DWORD lenToEnd = pInfo->bufferCapacity - pInfo->bufferWrite;
+	if (lenToEnd > inLength)
+		lenToEnd = inLength;
+	memcpy(&pBuffer[pInfo->bufferWrite], inData, lenToEnd);
+	DWORD lenRemaining = inLength - lenToEnd;
+	if (lenRemaining > 0)
+		memcpy(&pBuffer[0], (const void*)&inData[lenToEnd], lenRemaining);
+
+	pInfo->bufferSize += inLength;
+
+	IncrementWritePosition(pInfo, inLength);
+}
+void CInputStream::IncrementReadPosition(pInputStreamInfo pInfo, DWORD inLength)
+{
+	DWORD lenToEnd = pInfo->bufferCapacity - pInfo->bufferRead;
+	if (inLength <= lenToEnd)
+		pInfo->bufferRead += inLength;
+	else
+		pInfo->bufferRead = inLength - lenToEnd;
+}
+void CInputStream::IncrementWritePosition(pInputStreamInfo pInfo, DWORD inLength)
+{
+	DWORD lenToEnd = pInfo->bufferCapacity - pInfo->bufferWrite;
+	if (inLength <= lenToEnd)
+		pInfo->bufferWrite += inLength;
+	else
+		pInfo->bufferWrite = inLength - lenToEnd;
+}
+void CInputStream::DoPeekBuffer(pInputStreamInfo pInfo, BYTE* pBuffer, BYTE* outData, DWORD inLength)
+{
+	DWORD lenToEnd = pInfo->bufferCapacity - pInfo->bufferPeek;
+	if (lenToEnd > inLength)
+		lenToEnd = inLength;
+
+	if (lenToEnd > 0)
+		memcpy(outData, &pBuffer[pInfo->bufferPeek], lenToEnd);
+
+	DWORD lenRemaining = inLength - lenToEnd;
+	if (lenRemaining > 0)
+		memcpy(&outData[lenToEnd], &pBuffer[0], lenRemaining);
+}
+void CInputStream::IncrementPeekPosition(pInputStreamInfo pInfo, DWORD inLength)
+{
+	DWORD lenToEnd = pInfo->bufferCapacity - pInfo->bufferPeek;
+	if (inLength <= lenToEnd)
+		pInfo->bufferPeek += inLength;
+	else
+		pInfo->bufferPeek = inLength - lenToEnd;
+}
+long CInputStream::AddMessage(DWORD inLength, BYTE* inMessage)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::AddMessage clientId = %d inLength = %d this = %x\n", m_ClientID, inLength, this);
+
+	long err = TCAPI_ERR_NONE;
+
+	BOOL done = FALSE;
+	if (inLength == 0 || inMessage == NULL)
+		return TCAPI_ERR_NONE;
+
+	BOOL gotIt = WaitForAccess(); // will lock on first access only
+
+	pInputStreamInfo pInfo = GetInfoPtr();
+	pInputStreamData pData = GetDataPtr();
+	pInputStreamFile pFile = GetFilePtr();
+
+	long nfile = pInfo->numberBytesInFile;						// number bytes in file
+	long nbuffer = pInfo->numberBytes - pInfo->numberBytesInFile;	// number bytes in buffer
+
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+
+	DWORD nWriteSize = pInfo->bufferCapacity - pInfo->bufferSize;
+	if (nWriteSize > (inLength+sizeof(DWORD)))
+	{
+		DoWriteBuffer(pInfo, pData, (BYTE*)&inLength, sizeof(DWORD));
+		DoWriteBuffer(pInfo, pData, inMessage, inLength);
+		pInfo->numberBytes += inLength + sizeof(DWORD);
+		pInfo->numberMessages++;
+		done = TRUE;
+	}
+	else
+	{
+		// not enough room
+		// we just lost a message
+		TCDEBUGLOGS("CInputStream::AddMessage buffer overflowed and no file - msg lost\n");
+		LOGPERF("AddMessage buffer overflowed\n");
+		err = TCAPI_ERR_INPUTSTREAM_BUFFER_OVERFLOW_MISSED_MSGS;
+	}
+
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+
+#ifdef LOG_PERFORMANCE
+	char msg[200];
+	sprintf(msg, "AddMessage numMsgs = %d numByts = %d\n", pInfo->numberMessages, pInfo->numberBytes);
+	LOGPERF(msg);
+#endif
+//	ReleaseAccess(); // server will unlock all streams once the buffer is processed if any added
+
+	TCDEBUGLOGA1("CInputStream::AddMessage err=%d\n", err);
+	TCDEBUGCLOSE();
+
+	return err;
+}
+// now only used still by the C++ api
+DWORD CInputStream::GetNextMessage(DWORD inLength, BYTE* outMessage)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::GetNextMessage clientId = %d inLength = %d this = %x\n", m_ClientID, inLength, this);
+
+	DWORD outMsgLen = 0;
+
+	BOOL gotIt = WaitForAccess();
+	if (gotIt)
+	{
+		DWORD dwMsgSize = 0;
+
+		pInputStreamInfo pInfo = GetInfoPtr();
+		pInputStreamData pData = GetDataPtr();
+		pInputStreamFile pFile = GetFilePtr();
+
+		TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+
+		if (pInfo->numberBytes > 0)
+		{
+			DoReadBuffer(pInfo, pData, (BYTE*)&dwMsgSize, sizeof(DWORD));
+			if (inLength > dwMsgSize)
+				inLength = dwMsgSize;
+
+			DoReadBuffer(pInfo, pData, outMessage, inLength);
+			outMsgLen = inLength;
+			pInfo->numberMessages--;
+			pInfo->numberBytes = pInfo->numberBytes - dwMsgSize - sizeof(DWORD);
+		}
+
+		TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+
+		ReleaseAccess();
+	}
+
+#ifdef LOG_PERFORMANCE
+//	char msg[200];
+//	sprintf(msg, "GetNextMessage.outMsgLen = %d\n", outMsgLen);
+//	LOGPERF(msg);
+	LOGPERF("GetNextMessage\n");
+#endif
+
+	TCDEBUGLOGS("CInputStream::GetNextMessage done\n");
+	TCDEBUGCLOSE();
+
+	return outMsgLen;
+}
+
+DWORD CInputStream::GetNextMessageSize()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA2("CInputStream::GetNextMessageSize clientId = %d this = %x\n", m_ClientID, this);
+
+	DWORD length = 0;
+
+	BOOL gotIt = WaitForAccess();
+	if (gotIt)
+	{
+
+		pInputStreamInfo pInfo = GetInfoPtr();
+		pInputStreamData pData = GetDataPtr();
+
+		if (pInfo->numberMessages > 0)
+		{
+			pInfo->bufferPeek = pInfo->bufferRead;
+			DoPeekBuffer(pInfo, pData, (BYTE*)&length, sizeof(DWORD));
+		}
+
+		ReleaseAccess();
+	}
+
+	TCDEBUGLOGA1("CInputStream::GetNextMessageSize length = %d\n", length);
+	TCDEBUGCLOSE();
+
+#ifdef LOG_PERFORMANCE
+//	char msg[200];
+//	sprintf(msg, "GetNextMessageSize.length = %d\n", length);
+//	LOGPERF(msg);
+	LOGPERF("GetNextMessageSize\n");
+#endif
+	return length;
+}
+void CInputStream::GetMessageSizes(long inNumberMessages, DWORD* outMessageSizes)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::GetMessageSizes clientId = %d this = %x inNum = %d\n", m_ClientID, this, inNumberMessages);
+
+	BOOL gotIt = WaitForAccess();
+	if (gotIt)
+	{
+
+		pInputStreamData pData = GetDataPtr();
+		pInputStreamFile pFile = GetFilePtr();
+		pInputStreamInfo pInfo = GetInfoPtr();
+
+		DWORD numberToGet = inNumberMessages;
+		if (numberToGet > pInfo->numberMessages) numberToGet = pInfo->numberMessages;
+
+		if (numberToGet > 0)
+		{
+			pInfo->bufferPeek = pInfo->bufferRead;
+			for (DWORD i = 0; i < numberToGet; i++)
+			{
+				DWORD len = 0;
+				DoPeekBuffer(pInfo, pData, (BYTE*)&len, sizeof(DWORD));
+				outMessageSizes[i] = len;
+				IncrementPeekPosition(pInfo, len+sizeof(DWORD));
+			}
+		}
+
+		ReleaseAccess();
+	}
+
+	TCDEBUGLOGS("CInputStream::GetMessageSizes done\n");
+	TCDEBUGCLOSE();
+	LOGPERF("GetMessageSizes\n");
+}
+
+#ifdef TCF_CLIENT
+DWORD CInputStream::GetMessages(JNIEnv* env, long inNumberMessages, long inNumberMaxBytes, long& outNumberBytesRead, long& outNumberMessagesRead, jbyteArray outMessageData)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA2("CInputStream::GetMessages inNumberMaxBytes = %d inNumberMessages = %d\n", inNumberMaxBytes, inNumberMessages);
+
+	DWORD dwMsgSize = 0;
+	outNumberBytesRead = outNumberMessagesRead = 0;
+
+	BOOL gotIt = WaitForAccess();
+	if (gotIt)
+	{
+
+		pInputStreamInfo pInfo = GetInfoPtr();
+		pInputStreamData pData = GetDataPtr();
+
+		if (pInfo->numberMessages > 0)
+		{
+			if (inNumberMessages == 0 || (inNumberMessages > pInfo->numberMessages))
+				inNumberMessages = pInfo->numberMessages;
+			for (long i = 0; i < inNumberMessages; i++)
+			{
+				pInfo->bufferPeek = pInfo->bufferRead;
+				DoPeekBuffer(pInfo, pData, (BYTE*)&dwMsgSize, sizeof(DWORD));
+
+				if ((outNumberBytesRead + dwMsgSize) > inNumberMaxBytes)
+					break;
+
+				IncrementReadPosition(pInfo, sizeof(DWORD));
+				pInfo->numberBytes -= sizeof(DWORD);
+
+				DWORD lenToEnd = pInfo->bufferCapacity - pInfo->bufferRead;
+				if (lenToEnd > dwMsgSize)
+					lenToEnd = dwMsgSize;
+
+				if (lenToEnd > 0)
+					env->SetByteArrayRegion(outMessageData, outNumberBytesRead, lenToEnd, (jbyte*)&pData[pInfo->bufferRead]);
+
+				DWORD lenRemaining = dwMsgSize - lenToEnd;
+				if (lenRemaining > 0)
+					env->SetByteArrayRegion(outMessageData, outNumberBytesRead+lenToEnd, lenRemaining, (jbyte*)&pData[0]);
+
+				pInfo->bufferSize -= dwMsgSize;
+
+				IncrementReadPosition(pInfo, dwMsgSize);
+
+				outNumberBytesRead += dwMsgSize;
+				outNumberMessagesRead++;
+				pInfo->numberBytes -= dwMsgSize;
+				pInfo->numberMessages--;	
+
+				if ((i % 500) == 0)
+					Sleep(1);
+			}
+		}
+		if (pInfo->numberBytes == 0)
+		{
+			pInfo->bufferRead = pInfo->bufferWrite = 0;
+		}
+		ReleaseAccess();
+	}
+
+#ifdef LOG_PERFORMANCE
+	char msg[200];
+	sprintf(msg, "GetMessages numMsgs = %d numByts = %d\n", outNumberMessagesRead, outNumberBytesRead);
+	LOGPERF(msg);
+#endif
+
+	TCDEBUGLOGA2("CInputStream::GetMessages outNumberBytesRead = %d outNumberMessagesRead = %d\n", outNumberBytesRead, outNumberMessagesRead);
+	TCDEBUGCLOSE();
+	return outNumberMessagesRead;
+}
+#endif // TCF_CLIENT
+
+void CInputStream::GetTotalMessageSize(long inNumberMessages, DWORD& outTotalSize)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::GetTotalMessageSize clientId = %d this = %x inNum = %d\n", m_ClientID, this, inNumberMessages);
+
+	outTotalSize = 0;
+	BOOL gotIt = WaitForAccess();
+	if (gotIt)
+	{
+
+		DWORD len = 0;
+
+		pInputStreamData pData = GetDataPtr();
+		pInputStreamFile pFile = GetFilePtr();
+		pInputStreamInfo pInfo = GetInfoPtr();
+
+		DWORD numberToGet = inNumberMessages;
+		if (numberToGet > pInfo->numberMessages) numberToGet = pInfo->numberMessages;
+
+		if (numberToGet > 0)
+		{
+			TCDEBUGLOGA1("CInputStream::GetTotalMessageSize numberToGet = %d\n", numberToGet);
+			pInfo->bufferPeek = pInfo->bufferRead;
+			for (DWORD i = 0; i < numberToGet; i++)
+			{
+				DoPeekBuffer(pInfo, pData, (BYTE*)&len, sizeof(DWORD));
+				outTotalSize += len;
+				IncrementPeekPosition(pInfo, len+sizeof(DWORD));
+			}
+		}
+
+
+		ReleaseAccess();
+	}
+
+#ifdef LOG_PERFORMANCE
+//	char msg[200];
+//	sprintf(msg, "GetTotalMessageSize.outTotalSize = %d\n", outTotalSize);
+	LOGPERF("GetTotalMessageSize\n");
+#endif
+	TCDEBUGLOGA1("CInputStream::GetTotalMessageSize done = %d\n", outTotalSize);
+	TCDEBUGCLOSE();
+
+}
+
+#else // !USE_CIRCULAR_BUFFER
+
+long CInputStream::AddMessage(DWORD inLength, BYTE* inMessage)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::AddMessage clientId = %d inLength = %d this = %x\n", m_ClientID, inLength, this);
+
+	long err = TCAPI_ERR_NONE;
+
+	BOOL done = FALSE;
+	if (inLength == 0 || inMessage == NULL)
+		return TCAPI_ERR_NONE;
+
+	WaitForAccess();
+
+	pInputStreamInfo pInfo = GetInfoPtr();
+	pInputStreamData pData = GetDataPtr();
+	pInputStreamFile pFile = GetFilePtr();
+
+	long nfile = pInfo->numberBytesInFile;						// number bytes in file
+	long nbuffer = pInfo->numberBytes - pInfo->numberBytesInFile;	// number bytes in buffer
+
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessagesInBuff=%d numberBytesInBuff=%d\n",pInfo->numberMessages-pInfo->numberMessagesInFile, pInfo->numberBytes-pInfo->numberBytesInFile);
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessagesInFile=%d numberBytesInFile=%d\n",pInfo->numberMessagesInFile, pInfo->numberBytesInFile);
+
+	if (nfile > 0)
+	{
+		// we've already overflowed
+		//  and we are using an overflow file
+		//  so attempt to put message there
+		if ((nfile + inLength + sizeof(DWORD)) <= m_FileSize)
+		{
+			// room to put msg into file
+			BYTE* ptr = pFile;
+			ptr += nfile;
+			*(DWORD*)ptr = inLength;
+			ptr += sizeof(DWORD);
+			memcpy(ptr, inMessage, inLength);
+			pInfo->numberMessages++;
+			pInfo->numberMessagesInFile++;
+			pInfo->numberBytes += inLength + sizeof(DWORD);
+			pInfo->numberBytesInFile += inLength + sizeof(DWORD);
+			done = TRUE;
+		}
+		else
+		{
+			// we just lost a message
+			TCDEBUGLOGS("CInputStream::AddMessage file overflowed - msg lost\n");
+			err = TCAPI_ERR_INPUTSTREAM_FILE_OVERFLOW_MISSED_MSGS;
+		}
+	}
+	else // number of bytes in file == 0 ==> either not overflowed or not using file
+	{
+		// attempt to put message into buffer
+		if ((nbuffer + inLength + sizeof(DWORD)) <= m_BufferSize)
+		{
+			// room to put msg into buffer
+			BYTE* ptr = pData;
+			ptr += nbuffer;
+			*(DWORD*)ptr = inLength;
+			ptr += sizeof(DWORD);
+			memcpy(ptr, inMessage, inLength);
+			pInfo->numberMessages++;
+			pInfo->numberBytes += inLength + sizeof(DWORD);
+			done = TRUE;
+		}
+		else
+		{
+			// overflow to file?
+			if (m_File != NULL)
+			{
+				// room to put msg into file
+				BYTE* ptr = pFile;
+				ptr += nfile;
+				*(DWORD*)ptr = inLength;
+				ptr += sizeof(DWORD);
+				memcpy(ptr, inMessage, inLength);
+				pInfo->numberMessages++;
+				pInfo->numberMessagesInFile++;
+				pInfo->numberBytes += inLength + sizeof(DWORD);
+				pInfo->numberBytesInFile += inLength + sizeof(DWORD);
+				done = TRUE;
+				TCDEBUGLOGS("CInputStream::AddMessage buffer overflowed to file\n");
+				LOGPERF("AddMessage buffer overflowed\n");
+				err = TCAPI_INFO_INPUTSTREAM_BUFFER_OVERFLOW_TO_FILE;
+			}
+			else
+			{
+				// we just lost a message
+				TCDEBUGLOGS("CInputStream::AddMessage buffer overflowed and no file - msg lost\n");
+				err = TCAPI_ERR_INPUTSTREAM_BUFFER_OVERFLOW_MISSED_MSGS;
+			}
+		}
+	}
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessagesInBuff=%d numberBytesInBuff=%d\n",pInfo->numberMessages-pInfo->numberMessagesInFile, pInfo->numberBytes-pInfo->numberBytesInFile);
+	TCDEBUGLOGA2("CInputStream::AddMessage numberMessagesInFile=%d numberBytesInFile=%d\n",pInfo->numberMessagesInFile, pInfo->numberBytesInFile);
+
+	LOGPERF("AddMessage\n");
+
+	ReleaseAccess();
+
+	TCDEBUGLOGA1("CInputStream::AddMessage err=%d\n", err);
+	TCDEBUGCLOSE();
+
+	return err;
+}
+DWORD CInputStream::GetNextMessage(DWORD inLength, BYTE* outMessage)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::GetNextMessage clientId = %d inLength = %d this = %x\n", m_ClientID, inLength, this);
+
+	BYTE* ptr, *ptrStart;
+	DWORD outMsgLen = 0;
+
+	WaitForAccess();
+
+	DWORD dwMsgSize = 0;
+
+	pInputStreamInfo pInfo = GetInfoPtr();
+	pInputStreamData pData = GetDataPtr();
+	pInputStreamFile pFile = GetFilePtr();
+
+	ptr = ptrStart = pData;
+	TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+	TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessagesInBuff=%d numberBytesInBuff=%d\n",pInfo->numberMessages-pInfo->numberMessagesInFile, pInfo->numberBytes-pInfo->numberBytesInFile);
+	TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessagesInFile=%d numberBytesInFile=%d\n",pInfo->numberMessagesInFile, pInfo->numberBytesInFile);
+
+	if (pInfo->numberBytes > 0)
+	{
+		// get 1 msg from buffer
+		dwMsgSize = *(DWORD*)pData;
+		TCDEBUGLOGA1("CInputStream::GetNextMessage 1st msg in buffer = %d\n", dwMsgSize);
+		ptr += sizeof(DWORD);
+		if (inLength > dwMsgSize)
+			inLength = dwMsgSize;
+
+		memcpy(outMessage, ptr, inLength);
+		outMsgLen = inLength;
+		ptr += dwMsgSize;
+
+		// move up rest of buffer msgs to beginning of buffer
+		long moveLen = m_BufferSize - dwMsgSize - sizeof(DWORD);
+//		memcpy(ptrStart, ptr, moveLen);
+		TCDEBUGLOGA1("CInputStream::GetNextMessage move bytes up in buffer = %d\n", moveLen);
+		
+		// adjust totals
+		pInfo->numberMessages--;
+		pInfo->numberBytes = pInfo->numberBytes - dwMsgSize - sizeof(DWORD);
+		
+		// move next file message up to buffer
+		if (m_File != NULL)
+		{
+			// using file
+			if (pInfo->numberBytesInFile > 0)
+			{
+				// messages exist in file
+				dwMsgSize = *(DWORD*)pFile;
+				TCDEBUGLOGA1("CInputStream::GetNextMessage 1st msg in file   = %d\n", dwMsgSize);
+				long nbuffer = pInfo->numberBytes - pInfo->numberBytesInFile; // number bytes in buffer
+				if ((nbuffer + dwMsgSize + sizeof(DWORD)) <= m_BufferSize)
+				{
+					// room in buffer - move to buffer
+					ptrStart = &pData[nbuffer];
+					moveLen = dwMsgSize + sizeof(DWORD);
+					memcpy(ptrStart, pFile, moveLen);
+					TCDEBUGLOGA1("CInputStream::GetNextMessage move from file to buffer = %d\n", moveLen);
+					LOGPERF("GetNextMessage move from file to buffer\n");
+
+					// adjust file totals
+					pInfo->numberMessagesInFile--;
+					pInfo->numberBytesInFile = pInfo->numberBytesInFile - moveLen;
+
+					// move msgs in file up
+					if (pInfo->numberMessagesInFile > 0)
+					{
+						ptr = &pFile[moveLen];	// new end
+						moveLen = pInfo->numberBytesInFile;
+						memcpy(pFile, ptr, moveLen);
+						TCDEBUGLOGA1("CInputStream::GetNextMessage move bytes up in file = %d\n", moveLen);
+					}
+				}
+				else
+				{
+					// no room in buffer for next file msg - leave it there
+					TCDEBUGLOGS("CInputStream::GetNextMessage no room in buffer for message in file - leave it there\n");
+				}
+			}
+		}
+	}
+
+	TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessages      =%d numberBytes      =%d\n",pInfo->numberMessages, pInfo->numberBytes);
+	TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessagesInBuff=%d numberBytesInBuff=%d\n",pInfo->numberMessages-pInfo->numberMessagesInFile, pInfo->numberBytes-pInfo->numberBytesInFile);
+	TCDEBUGLOGA2("CInputStream::GetNextMessage numberMessagesInFile=%d numberBytesInFile=%d\n",pInfo->numberMessagesInFile, pInfo->numberBytesInFile);
+
+	LOGPERF("GetNextMessage\n");
+
+	ReleaseAccess();
+
+	TCDEBUGLOGS("CInputStream::GetNextMessage done\n");
+	TCDEBUGCLOSE();
+
+	return outMsgLen;
+}
+DWORD CInputStream::GetNextMessageSize()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA2("CInputStream::GetNextMessageSize clientId = %d this = %x\n", m_ClientID, this);
+
+	DWORD length = 0;
+
+	WaitForAccess();
+
+	pInputStreamInfo pInfo = GetInfoPtr();
+	pInputStreamData pData = GetDataPtr();
+
+	if (pInfo->numberMessages > 0)
+	{
+		BYTE* ptr = pData;
+		length = *(DWORD*)ptr;
+	}
+
+	ReleaseAccess();
+
+	TCDEBUGLOGA1("CInputStream::GetNextMessageSize length = %d\n", length);
+	TCDEBUGCLOSE();
+
+	LOGPERF("GetNextMessageSize\n");
+	return length;
+}
+void CInputStream::GetMessageSizes(long inNumberMessages, DWORD* outMessageSizes)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::GetMessageSizes clientId = %d this = %x inNum = %d\n", m_ClientID, this, inNumberMessages);
+
+	WaitForAccess();
+	pInputStreamData pData = GetDataPtr();
+	pInputStreamFile pFile = GetFilePtr();
+	pInputStreamInfo pInfo = GetInfoPtr();
+
+	long ntotal = pInfo->numberMessages;			// number of total messages (buffer + file)
+	long nfile = pInfo->numberMessagesInFile;		// number of file messages
+	long nbuffer = ntotal - nfile;				// number of buffer messages
+	long nreadtotal = min(ntotal, inNumberMessages);	// number of total messages to read
+	long nreadbuffer = min(nbuffer, nreadtotal);		// number of buffer messages to read
+	long nreadfile = nreadtotal - nreadbuffer;			// number of file messages to read
+	TCDEBUGLOGA3("CInputStream::GetMessageSizes ntotal=%d nfile=%d nbuffer=%d\n", ntotal, nfile, nbuffer);
+	TCDEBUGLOGA3("CInputStream::GetMessageSizes nreadtotal=%d nreadfile=%d nreadbuffer=%d\n", nreadtotal, nreadfile, nreadbuffer);
+
+
+	if (nreadtotal > 0)
+	{
+		if (nreadbuffer > 0)
+		{
+			BYTE* ptr = pData;
+			DWORD prevSize = 0;
+			for (long i = 0; i < nreadbuffer; i++)
+			{
+				DWORD len = *(DWORD*)&ptr[prevSize];
+				outMessageSizes[i] = len;
+				prevSize += len + sizeof(DWORD);
+			}
+		}
+		if (nreadfile > 0)
+		{
+			BYTE* ptr = pFile;
+			DWORD prevSize = 0;
+			for (long i = 0; i < nreadfile; i++)
+			{
+				DWORD len = *(DWORD*)&ptr[prevSize];
+				outMessageSizes[nreadbuffer+i] = len;
+				prevSize += len + sizeof(DWORD);
+			}
+		}
+	}
+
+
+	ReleaseAccess();
+
+	TCDEBUGLOGS("CInputStream::GetMessageSizes done\n");
+	TCDEBUGCLOSE();
+	LOGPERF("GetMessageSizes\n");
+}
+
+void CInputStream::GetTotalMessageSize(long inNumberMessages, DWORD& outTotalSize)
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA3("CInputStream::GetTotalMessageSize clientId = %d this = %x inNum = %d\n", m_ClientID, this, inNumberMessages);
+
+	WaitForAccess();
+	pInputStreamData pData = GetDataPtr();
+	pInputStreamFile pFile = GetFilePtr();
+	pInputStreamInfo pInfo = GetInfoPtr();
+
+	long ntotal = pInfo->numberMessages;			// number of total messages (buffer + file)
+	long nfile = pInfo->numberMessagesInFile;		// number of file messages
+	long nbuffer = ntotal - nfile;				// number of buffer messages
+	long nreadtotal = min(ntotal, inNumberMessages);	// number of total messages to read
+	long nreadbuffer = min(nbuffer, nreadtotal);		// number of buffer messages to read
+	long nreadfile = nreadtotal - nreadbuffer;			// number of file messages to read
+	TCDEBUGLOGA3("CInputStream::GetTotalMessageSize ntotal=%d nfile=%d nbuffer=%d\n", ntotal, nfile, nbuffer);
+	TCDEBUGLOGA3("CInputStream::GetTotalMessageSize nreadtotal=%d nreadfile=%d nreadbuffer=%d\n", nreadtotal, nreadfile, nreadbuffer);
+
+	outTotalSize = 0;
+
+	if (nreadtotal > 0)
+	{
+		if (nreadbuffer > 0)
+		{
+			BYTE* ptr = pData;
+			DWORD prevSize = 0;
+			for (long i = 0; i < nreadbuffer; i++)
+			{
+				DWORD len = *(DWORD*)&ptr[prevSize];
+				outTotalSize += len;
+				prevSize += len + sizeof(DWORD);
+			}
+		}
+		if (nreadfile > 0)
+		{
+			BYTE* ptr = pFile;
+			DWORD prevSize = 0;
+			for (long i = 0; i < nreadfile; i++)
+			{
+				DWORD len = *(DWORD*)&ptr[prevSize];
+				outTotalSize += len;
+				prevSize += len + sizeof(DWORD);
+			}
+		}
+	}
+
+
+	ReleaseAccess();
+
+	TCDEBUGLOGS("CInputStream::GetTotalMessageSize done\n");
+	TCDEBUGCLOSE();
+	LOGPERF("GetTotalMessageSize\n");
+}
+#endif // USE_CIRCULAR_BUFFER
+
+LONG CInputStream::GetNumberMessages()
+{
+	TCDEBUGOPEN();
+	TCDEBUGLOGA2("CInputStream::GetNumberMessages clientId = %d this = %x\n", m_ClientID, this);
+
+	LONG number = 0;
+
+	WaitForAccess();
+
+	pInputStreamInfo pInfo = GetInfoPtr();
+
+	number = pInfo->numberMessages;
+
+	ReleaseAccess();
+
+	TCDEBUGLOGA1("CInputStream::GetNumberMessages number = %d\n", number);
+	TCDEBUGCLOSE();
+
+	LOGPERF("GetNumberMessages\n");
+	return number;
+}
+
+
+
+BOOL CInputStreamData::Init()
+{
+	if (IsCreator())
+	{
+	}
+	return TRUE;
+}
+BOOL CInputStreamInfo::Init()
+{
+	if (IsCreator())
+	{
+		pInputStreamInfo pInfo = (pInputStreamInfo)GetDataPtr();
+		pInfo->numberBytes = 0;
+		pInfo->numberMessages = 0;
+		pInfo->numberBytesInFile = 0;
+		pInfo->numberMessagesInFile = 0;
+#ifdef USE_CIRCULAR_BUFFER
+		pInfo->bufferRead = 0;				// where to read from buffer
+		pInfo->bufferPeek = 0;				// where to peek some data from buffer
+		pInfo->bufferWrite = 0;				// where to write to buffer
+		pInfo->bufferSize = 0;				// current size of buffer
+		pInfo->bufferCapacity = 0;			// total capacity of buffer
+#endif
+	}
+	return TRUE;
+}
+
+// real file on disk (ie, not tied to swap file)
+void CInputStreamFile::SetClientId(long clientId)
+{
+	m_ClientID = clientId;
+}
+BOOL CInputStreamFile::Open(DWORD dwSize, CHAR* filePath)
+{
+	BOOL fOk = FALSE;
+
+	// first process (clients) will create and map file
+	// second process (server) will get an error on create, but will go ahead and map
+	m_hFile = CreateFile(
+		filePath, 
+		GENERIC_READ|GENERIC_WRITE, 
+		FILE_SHARE_READ|FILE_SHARE_WRITE,
+		0,
+		CREATE_ALWAYS, 
+		FILE_ATTRIBUTE_NORMAL|FILE_FLAG_DELETE_ON_CLOSE,
+		0);
+
+	char mapname[80];
+	sprintf(mapname, "%s%d", INPUTSTREAMOVERFLOW_MAP_BASENAME, m_ClientID);
+	if (m_hFile != INVALID_HANDLE_VALUE)
+	{
+		// we got a good handle (we might have gotten an error, but create was successful)
+		// create a mapping to this file
+		fOk = CSharedData::Open(m_hFile, dwSize, mapname);
+		TCDEBUGOPEN();
+		TCDEBUGLOGS("CInputStreamFile::Open CreateFile successful\n");
+		TCDEBUGCLOSE();
+	}
+	else
+	{
+		// this causes the open to do an OpenFileMapping instead of a CreateFileMapping (file handle not needed in former)
+		fOk = CSharedData::Open(INVALID_HANDLE_VALUE, dwSize, mapname);
+		TCDEBUGOPEN();
+		TCDEBUGLOGS("CInputStreamFile::Open CreateFile failed\n");
+		TCDEBUGCLOSE();
+	}
+
+	return fOk;
+}
+
+CInputStreamFile::Close()
+{
+	// close all mapping handles
+	CSharedData::Close();
+	
+	// close file
+	if (m_hFile != INVALID_HANDLE_VALUE)
+	{
+		CloseHandle(m_hFile);
+		m_hFile = INVALID_HANDLE_VALUE;
+	}
+}
+
+BOOL CInputStreamFile::Init()
+{
+	return TRUE;
+}
+
+#ifdef LOG_PERFORMANCE
+static void logPerf(char* msg)
+{
+	if (fpLog)
+	{
+		SYSTEMTIME sTime;
+		GetLocalTime(&sTime);
+		fprintf(fpLog, 
+			"%02.2d%02.2d-%02.2d:%02.2d:%02.2d.%03.3d: %s",
+			sTime.wDay, sTime.wMonth, sTime.wHour, sTime.wMinute, sTime.wSecond, sTime.wMilliseconds,
+			msg);
+
+		numLogged++;
+		if ((numLogged % 1000) == 0)
+			fflush(fpLog);
+	}
+}
+static void openPerf()
+{
+	struct _stat buf;
+	char* dirname = "c:\\tcf";
+	int result = _stat(dirname, &buf);
+	if (result == 0) // exists
+	{
+		if (fpLog == NULL)
+			fpLog = _fsopen(perfFileName, "at", _SH_DENYNO);
+	}
+	else
+	{
+		fpLog = NULL;
+	}
+}
+static void closePerf()
+{
+	if (fpLog)
+	{
+		fflush(fpLog);
+		fclose(fpLog);
+	}
+	fpLog = NULL;
+}
+#endif
\ No newline at end of file