connectivity/com.nokia.tcf/src/com/nokia/tcf/impl/TCMessageInputStream.java
author Chad Peckham <chad.peckham@nokia.com>
Wed, 12 Aug 2009 14:55:55 -0500
changeset 429 ddde4cae03be
parent 0 fb279309251b
child 2042 2c44aae86249
permissions -rw-r--r--
merge change for EDC from carbidecpp35

/*
* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
* All rights reserved.
* This component and the accompanying materials are made available
* under the terms of the License "Eclipse Public License v1.0"
* which accompanies this distribution, and is available
* at the URL "http://www.eclipse.org/legal/epl-v10.html".
*
* Initial Contributors:
* Nokia Corporation - initial contribution.
*
* Contributors:
*
* Description: 
*
*/
/**
 * 
 */
package com.nokia.tcf.impl;

import java.io.IOException;

import org.eclipse.core.runtime.IStatus;

import com.nokia.tcf.api.ITCMessage;
import com.nokia.tcf.api.ITCMessageInputStream;
import com.nokia.tcf.api.TCErrorConstants;
import com.nokia.tcf.api.TCFClassFactory;

public class TCMessageInputStream implements ITCMessageInputStream {

	private long inputStreamSize;
	private long clientId;
	private TCAPIConnection api;
	private boolean isOpen = false;
	private int blockingTime = 10;
	private long MAX_BYTES = 2*1024*1024;

	/**
	 * Constructor
	 * 
	 * @param inputStreamSize
	 * @param overFlowToFile
	 * @param clientId
	 */
	public TCMessageInputStream(TCAPIConnection apiConnection, long inputStreamSize, long clientId) {
		this.inputStreamSize = inputStreamSize;
		this.clientId = clientId;
		api = apiConnection;
	}

	public int peekMessages() throws IOException {
		int number = 0;
		if (!this.isOpen) {
			// stream not open
			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
		// poll for # messages in stream
		long[] outNumberTotalMessages = new long[1];
		long ret = api.nativePollInputStream(this.clientId, outNumberTotalMessages);
		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
			// error on first poll
			String errString = TCErrorConstants.getErrorMessage((int)ret);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
		number = (int)outNumberTotalMessages[0];
		
		return number;
	}

	public boolean isOpen() {
		return this.isOpen;
	}

	public void open() throws IOException {
		if (!this.isOpen) {
			// Remove overflow to file and input stream file path arguments from nativeOpenInputStream()
			long ret = api.nativeOpenInputStream(clientId, null, inputStreamSize, false);
			if (ret == TCErrorConstants.TCAPI_ERR_NONE) {
				isOpen = true;
			} else {
				String errString = TCErrorConstants.getErrorMessage((int)ret);
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
		}
	}

	public ITCMessage readMessage() throws IOException {
		ITCMessage message = null;

		// if not open throw exception
		if (!this.isOpen) {
			// stream not open
			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}

		// poll for # messages in stream
		long[] outNumberTotalMessages = new long[1];
		outNumberTotalMessages[0] = 0;
		do {
			if (!this.isOpen) {
				// stream not open
				String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
			long ret = api.nativePollInputStream(this.clientId, outNumberTotalMessages);
			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
				// error on first poll
				String errString = TCErrorConstants.getErrorMessage((int)ret);
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
			if (outNumberTotalMessages[0] == 0) {
				try {
					Thread.sleep(blockingTime);
				} catch (InterruptedException e) {
					break;
				}
			}
		} while (outNumberTotalMessages[0] == 0);
		
		// check # in stream
		if (outNumberTotalMessages[0] <= 0) {
			// no messages in stream
			return message;
		}
		// get number of bytes in first message
		long[] outNumberBytesInMessage = new long[1];
		long ret = api.nativeGetInputStreamMessageBytes(this.clientId, 1, outNumberBytesInMessage);
		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
			// error on getting bytes in message
			String errString = TCErrorConstants.getErrorMessage((int)ret);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}

		// OK, read first message
		long totalBytes = outNumberBytesInMessage[0];
		byte[] outMessage = new byte[(int)totalBytes];
		long[] outNumberMessages = new long[1];
		long[] outNumberBytesRead = new long[1];
		ret = api.nativeReadInputStream(this.clientId, 1, outNumberMessages, outNumberBytesRead, totalBytes, outMessage);
		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
			// error on read message
			String errString = TCErrorConstants.getErrorMessage((int)ret);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
		
		// OK, pass back the message
		message = TCFClassFactory.createITCMessage(outMessage);
		
		return message;
	}

	public ITCMessage[] readMessages(int inNumberMessages) throws IOException {
		ITCMessage[] messages = null;

		// if not open throw exception
		if (!this.isOpen) {
			// stream not open
			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
		// poll for # messages in stream
		long[] outNumberTotalMessages = new long[1];
		outNumberTotalMessages[0] = 0;
		do {
			if (!this.isOpen) {
				// stream not open
				String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
			long ret = api.nativePollInputStream(this.clientId, outNumberTotalMessages);
			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
				// error on first poll
				String errString = TCErrorConstants.getErrorMessage((int)ret);
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
			if (outNumberTotalMessages[0] == 0) {
				try {
					Thread.sleep(blockingTime);
				} catch (InterruptedException e) {
					break;
				}
			}
		} while(outNumberTotalMessages[0] == 0);
		
		// check # in stream
		if (outNumberTotalMessages[0] <= 0) {
			// no messages in stream
			return messages;
		}
		if (inNumberMessages == 0) {
			inNumberMessages = (int)outNumberTotalMessages[0];
		} else if (inNumberMessages > outNumberTotalMessages[0]) {
			inNumberMessages = (int)outNumberTotalMessages[0];
		}
		// get number of bytes in each message
		long[] outNumberBytesInMessages = new long[inNumberMessages];
		long ret = api.nativeGetInputStreamMessageBytes(this.clientId, inNumberMessages, outNumberBytesInMessages);
		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
			// error on getting bytes in message
			String errString = TCErrorConstants.getErrorMessage((int)ret);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
		// read all requested messages
		int totalBytes = 0;
		for (int i = 0; i < inNumberMessages; i++) {
			totalBytes += outNumberBytesInMessages[i];
		}
		if (totalBytes > MAX_BYTES) totalBytes = (int)MAX_BYTES;
		byte[] messageData = new byte[totalBytes];
		long[] outNumberMessages = new long[1];
		long[] outNumberBytesRead = new long[1];
		ret = api.nativeReadInputStream(this.clientId, inNumberMessages, outNumberMessages, outNumberBytesRead, totalBytes, messageData);
		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
			// error on getting bytes in message
			String errString = TCErrorConstants.getErrorMessage((int)ret);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
		int messageDataStart = 0;
		messages = new TCMessage[(int)outNumberMessages[0]];
		for (int i = 0; i < outNumberMessages[0]; i++) {
			int size = (int)outNumberBytesInMessages[i];
			byte[] data = new byte[size];
			for (int j = 0; j < size; j++) {
				data[j] = messageData[messageDataStart + j];
			}
			messages[i] = TCFClassFactory.createITCMessage(data);
			messageDataStart += size;
		}
		return messages;
	}

	public void close() throws IOException {
		if (this.isOpen) {
			// Stop capturing first
			IStatus status = api.stop();
			if (status.isOK()) {
				// ok, now close the stream
				long ret = api.nativeCloseInputStream(this.clientId);
				if (ret == TCErrorConstants.TCAPI_ERR_NONE) {
					isOpen = false;
				} else {
					String errString = TCErrorConstants.getErrorMessage((int)ret);
					IOException e = new IOException(errString);
					e.fillInStackTrace();
					throw e;
				}
			} else {
				String errString = status.getMessage();
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
		}		
	}
	public byte[] readBytes(int inNumberMessages) throws IOException {
		// if not open throw exception
		if (!this.isOpen) {
			// stream not open
			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
//		inNumberMessages = 0; // 0 --> get all that fit in MAX_BYTES
		byte[] messageData = new byte[(int) MAX_BYTES];
		long[] outNumberMessagesRead = new long[1];
		outNumberMessagesRead[0] = 0;
		long[] outNumberBytesRead = new long[1];
		do {
			long ret = api.nativeReadInputStream(this.clientId, inNumberMessages, outNumberMessagesRead, outNumberBytesRead, MAX_BYTES, messageData);
			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
				// error on getting bytes in message
				String errString = TCErrorConstants.getErrorMessage((int)ret);
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
			if (outNumberMessagesRead[0] == 0) {
				try {
					Thread.sleep(blockingTime);
				} catch (InterruptedException e) {
					break;
				}
			}
		} while (outNumberMessagesRead[0] == 0);

		// return how many messages actually processed
		byte[] newMessageData = new byte[(int)outNumberBytesRead[0]];
		for (int i = 0; i < outNumberBytesRead[0]; i++) {
			newMessageData[i] = messageData[i]; 
		}
		return newMessageData;
	}
	public byte[] readBytes(int inNumberMessages, int[] outNumberMessages) throws IOException {
		outNumberMessages[0] = 0;
		
		// if not open throw exception
		if (!this.isOpen) {
			// stream not open
			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
			IOException e = new IOException(errString);
			e.fillInStackTrace();
			throw e;
		}
		inNumberMessages = 0; // 0 --> get all that fit in MAX_BYTES
		byte[] messageData = new byte[(int) MAX_BYTES];
		long[] outNumberMessagesRead = new long[1];
		outNumberMessagesRead[0] = 0;
		long[] outNumberBytesRead = new long[1];
		do {
			long ret = api.nativeReadInputStream(this.clientId, inNumberMessages, outNumberMessagesRead, outNumberBytesRead, MAX_BYTES, messageData);
			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
				// error on getting bytes in message
				String errString = TCErrorConstants.getErrorMessage((int)ret);
				IOException e = new IOException(errString);
				e.fillInStackTrace();
				throw e;
			}
			if (outNumberMessagesRead[0] == 0) {
				try {
					Thread.sleep(blockingTime);
				} catch (InterruptedException e) {
					break;
				}
			}
		} while (outNumberMessagesRead[0] == 0);
		
		// return how many messages actually processed
		byte[] newMessageData = new byte[(int)outNumberBytesRead[0]];
		for (int i = 0; i < outNumberBytesRead[0]; i++) {
			newMessageData[i] = messageData[i]; 
		}
		outNumberMessages[0] = (int)outNumberMessagesRead[0];
		return newMessageData;
		
	}
}