connectivity/com.nokia.tcf/src/com/nokia/tcf/impl/TCMessageInputStream.java
changeset 0 fb279309251b
child 429 ddde4cae03be
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/connectivity/com.nokia.tcf/src/com/nokia/tcf/impl/TCMessageInputStream.java	Fri Apr 03 23:33:03 2009 +0100
@@ -0,0 +1,367 @@
+/*
+* Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of the License "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description: 
+*
+*/
+/**
+ * 
+ */
+package com.nokia.tcf.impl;
+
+import java.io.IOException;
+
+import org.eclipse.core.runtime.IStatus;
+
+import com.nokia.tcf.api.ITCMessage;
+import com.nokia.tcf.api.ITCMessageInputStream;
+import com.nokia.tcf.api.TCErrorConstants;
+import com.nokia.tcf.api.TCFClassFactory;
+
+public class TCMessageInputStream implements ITCMessageInputStream {
+
+	private long inputStreamSize;
+	private long clientId;
+	private TCAPIConnection api;
+	private boolean isOpen = false;
+	private int blockingTime = 10;
+	private long MAX_BYTES = 2*1024*1024;
+
+	/**
+	 * Constructor
+	 * 
+	 * @param inputStreamSize
+	 * @param overFlowToFile
+	 * @param clientId
+	 */
+	public TCMessageInputStream(TCAPIConnection apiConnection, long inputStreamSize, long clientId) {
+		this.inputStreamSize = inputStreamSize;
+		this.clientId = clientId;
+		api = apiConnection;
+	}
+
+	public int peekMessages() throws IOException {
+		int number = 0;
+		if (!this.isOpen) {
+			// stream not open
+			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		// poll for # messages in stream
+		long[] outNumberTotalMessages = new long[1];
+		long ret = api.nativePollInputStream(this.clientId, outNumberTotalMessages);
+		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+			// error on first poll
+			String errString = TCErrorConstants.getErrorMessage((int)ret);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		number = (int)outNumberTotalMessages[0];
+		
+		return number;
+	}
+
+	public boolean isOpen() {
+		return this.isOpen;
+	}
+
+	public void open() throws IOException {
+		if (!this.isOpen) {
+			// Remove overflow to file and input stream file path arguments from nativeOpenInputStream()
+			long ret = api.nativeOpenInputStream(clientId, null, inputStreamSize, false);
+			if (ret == TCErrorConstants.TCAPI_ERR_NONE) {
+				isOpen = true;
+			} else {
+				String errString = TCErrorConstants.getErrorMessage((int)ret);
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+		}
+	}
+
+	public ITCMessage readMessage() throws IOException {
+		ITCMessage message = null;
+
+		// if not open throw exception
+		if (!this.isOpen) {
+			// stream not open
+			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+
+		// poll for # messages in stream
+		long[] outNumberTotalMessages = new long[1];
+		outNumberTotalMessages[0] = 0;
+		do {
+			if (!this.isOpen) {
+				// stream not open
+				String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+			long ret = api.nativePollInputStream(this.clientId, outNumberTotalMessages);
+			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+				// error on first poll
+				String errString = TCErrorConstants.getErrorMessage((int)ret);
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+			if (outNumberTotalMessages[0] == 0) {
+				try {
+					Thread.sleep(blockingTime);
+				} catch (InterruptedException e) {
+					break;
+				}
+			}
+		} while (outNumberTotalMessages[0] == 0);
+		
+		// check # in stream
+		if (outNumberTotalMessages[0] <= 0) {
+			// no messages in stream
+			return message;
+		}
+		// get number of bytes in first message
+		long[] outNumberBytesInMessage = new long[1];
+		long ret = api.nativeGetInputStreamMessageBytes(this.clientId, 1, outNumberBytesInMessage);
+		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+			// error on getting bytes in message
+			String errString = TCErrorConstants.getErrorMessage((int)ret);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+
+		// OK, read first message
+		long totalBytes = outNumberBytesInMessage[0];
+		byte[] outMessage = new byte[(int)totalBytes];
+		long[] outNumberMessages = new long[1];
+		long[] outNumberBytesRead = new long[1];
+		ret = api.nativeReadInputStream(this.clientId, 1, outNumberMessages, outNumberBytesRead, totalBytes, outMessage);
+		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+			// error on read message
+			String errString = TCErrorConstants.getErrorMessage((int)ret);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		
+		// OK, pass back the message
+		message = TCFClassFactory.createITCMessage(outMessage);
+		
+		return message;
+	}
+
+	public ITCMessage[] readMessages(int inNumberMessages) throws IOException {
+		ITCMessage[] messages = null;
+
+		// if not open throw exception
+		if (!this.isOpen) {
+			// stream not open
+			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		// poll for # messages in stream
+		long[] outNumberTotalMessages = new long[1];
+		outNumberTotalMessages[0] = 0;
+		do {
+			if (!this.isOpen) {
+				// stream not open
+				String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+			long ret = api.nativePollInputStream(this.clientId, outNumberTotalMessages);
+			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+				// error on first poll
+				String errString = TCErrorConstants.getErrorMessage((int)ret);
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+			if (outNumberTotalMessages[0] == 0) {
+				try {
+					Thread.sleep(blockingTime);
+				} catch (InterruptedException e) {
+					break;
+				}
+			}
+		} while(outNumberTotalMessages[0] == 0);
+		
+		// check # in stream
+		if (outNumberTotalMessages[0] <= 0) {
+			// no messages in stream
+			return messages;
+		}
+		if (inNumberMessages == 0) {
+			inNumberMessages = (int)outNumberTotalMessages[0];
+		} else if (inNumberMessages > outNumberTotalMessages[0]) {
+			inNumberMessages = (int)outNumberTotalMessages[0];
+		}
+		// get number of bytes in each message
+		long[] outNumberBytesInMessages = new long[inNumberMessages];
+		long ret = api.nativeGetInputStreamMessageBytes(this.clientId, inNumberMessages, outNumberBytesInMessages);
+		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+			// error on getting bytes in message
+			String errString = TCErrorConstants.getErrorMessage((int)ret);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		// read all requested messages
+		int totalBytes = 0;
+		for (int i = 0; i < inNumberMessages; i++) {
+			totalBytes += outNumberBytesInMessages[i];
+		}
+		if (totalBytes > MAX_BYTES) totalBytes = (int)MAX_BYTES;
+		byte[] messageData = new byte[totalBytes];
+		long[] outNumberMessages = new long[1];
+		long[] outNumberBytesRead = new long[1];
+		ret = api.nativeReadInputStream(this.clientId, inNumberMessages, outNumberMessages, outNumberBytesRead, totalBytes, messageData);
+		if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+			// error on getting bytes in message
+			String errString = TCErrorConstants.getErrorMessage((int)ret);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		int messageDataStart = 0;
+		messages = new TCMessage[(int)outNumberMessages[0]];
+		for (int i = 0; i < outNumberMessages[0]; i++) {
+			int size = (int)outNumberBytesInMessages[i];
+			byte[] data = new byte[size];
+			for (int j = 0; j < size; j++) {
+				data[j] = messageData[messageDataStart + j];
+			}
+			messages[i] = TCFClassFactory.createITCMessage(data);
+			messageDataStart += size;
+		}
+		return messages;
+	}
+
+	public void close() throws IOException {
+		if (this.isOpen) {
+			// Stop capturing first
+			IStatus status = api.stop();
+			if (status.isOK()) {
+				// ok, now close the stream
+				long ret = api.nativeCloseInputStream(this.clientId);
+				if (ret == TCErrorConstants.TCAPI_ERR_NONE) {
+					isOpen = false;
+				} else {
+					String errString = TCErrorConstants.getErrorMessage((int)ret);
+					IOException e = new IOException(errString);
+					e.fillInStackTrace();
+					throw e;
+				}
+			} else {
+				String errString = status.getMessage();
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+		}		
+	}
+	public byte[] readBytes(int inNumberMessages) throws IOException {
+		// if not open throw exception
+		if (!this.isOpen) {
+			// stream not open
+			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		inNumberMessages = 0; // 0 --> get all that fit in MAX_BYTES
+		byte[] messageData = new byte[(int) MAX_BYTES];
+		long[] outNumberMessagesRead = new long[1];
+		outNumberMessagesRead[0] = 0;
+		long[] outNumberBytesRead = new long[1];
+		do {
+			long ret = api.nativeReadInputStream(this.clientId, inNumberMessages, outNumberMessagesRead, outNumberBytesRead, MAX_BYTES, messageData);
+			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+				// error on getting bytes in message
+				String errString = TCErrorConstants.getErrorMessage((int)ret);
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+			if (outNumberMessagesRead[0] == 0) {
+				try {
+					Thread.sleep(blockingTime);
+				} catch (InterruptedException e) {
+					break;
+				}
+			}
+		} while (outNumberMessagesRead[0] == 0);
+
+		// return how many messages actually processed
+		byte[] newMessageData = new byte[(int)outNumberBytesRead[0]];
+		for (int i = 0; i < outNumberBytesRead[0]; i++) {
+			newMessageData[i] = messageData[i]; 
+		}
+		return newMessageData;
+	}
+	public byte[] readBytes(int inNumberMessages, int[] outNumberMessages) throws IOException {
+		outNumberMessages[0] = 0;
+		
+		// if not open throw exception
+		if (!this.isOpen) {
+			// stream not open
+			String errString = TCErrorConstants.getErrorMessage(TCErrorConstants.TCAPI_ERR_INPUTSTREAM_CLOSED);
+			IOException e = new IOException(errString);
+			e.fillInStackTrace();
+			throw e;
+		}
+		inNumberMessages = 0; // 0 --> get all that fit in MAX_BYTES
+		byte[] messageData = new byte[(int) MAX_BYTES];
+		long[] outNumberMessagesRead = new long[1];
+		outNumberMessagesRead[0] = 0;
+		long[] outNumberBytesRead = new long[1];
+		do {
+			long ret = api.nativeReadInputStream(this.clientId, inNumberMessages, outNumberMessagesRead, outNumberBytesRead, MAX_BYTES, messageData);
+			if (ret != TCErrorConstants.TCAPI_ERR_NONE) {
+				// error on getting bytes in message
+				String errString = TCErrorConstants.getErrorMessage((int)ret);
+				IOException e = new IOException(errString);
+				e.fillInStackTrace();
+				throw e;
+			}
+			if (outNumberMessagesRead[0] == 0) {
+				try {
+					Thread.sleep(blockingTime);
+				} catch (InterruptedException e) {
+					break;
+				}
+			}
+		} while (outNumberMessagesRead[0] == 0);
+		
+		// return how many messages actually processed
+		byte[] newMessageData = new byte[(int)outNumberBytesRead[0]];
+		for (int i = 0; i < outNumberBytesRead[0]; i++) {
+			newMessageData[i] = messageData[i]; 
+		}
+		outNumberMessages[0] = (int)outNumberMessagesRead[0];
+		return newMessageData;
+		
+	}
+}