diff -r f5050f1da672 -r 04becd199f91 javacommons/comms/src/commsendpoint.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/javacommons/comms/src/commsendpoint.cpp Tue Apr 27 16:30:29 2010 +0300 @@ -0,0 +1,512 @@ +/* +* Copyright (c) 2008 Nokia Corporation and/or its subsidiary(-ies). +* All rights reserved. +* This component and the accompanying materials are made available +* under the terms of "Eclipse Public License v1.0" +* which accompanies this distribution, and is available +* at the URL "http://www.eclipse.org/legal/epl-v10.html". +* +* Initial Contributors: +* Nokia Corporation - initial contribution. +* +* Contributors: +* +* Description: This is an abstract base class for Comms endpoints. +* +*/ + +#include +#include +#include +#include +#include + +#include "logger.h" + +#include "commsendpoint.h" +#include "commslistener.h" +#include "commsmessage.h" + +#include "listeners.h" + +namespace java +{ +namespace comms +{ +using java::util::ScopedLock; + +OS_EXPORT CommsEndpoint::CommsEndpoint() : mDefaultListener(0), mMessageRefCounter(0), mVm(0), mJNIEnv(0) +{ +// JELOG2(EJavaComms); + pthread_mutex_init(&mSendReceiveMutex, 0); + pthread_cond_init(&mSendReceiveCondition, 0); +} + +OS_EXPORT CommsEndpoint::~CommsEndpoint() +{ +// JELOG2(EJavaComms); + + for (listeners_t::iterator it = mListeners.begin(); it != mListeners.end(); it++) + { + delete it->second; + } + mListeners.clear(); + + delete mDefaultListener; + + pthread_mutex_destroy(&mSendReceiveMutex); + pthread_cond_destroy(&mSendReceiveCondition); +} + +OS_EXPORT void CommsEndpoint::handleIpcMessage(CommsMessage& aMessage) +{ +// JELOG2(EJavaComms); + + /* Message routing is done in following steps: + * 1. check if received message is reply to sendReceive method call + * 2. use moduleId to find correct listener + * */ + if (! isSendReceiveMessage(aMessage)) + { + handleMessage(aMessage); + } +} + + +OS_EXPORT int CommsEndpoint::handleMessage(CommsMessage& aMessage) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + + + if (mVm && !mJNIEnv) + { + // attach to VM + mVm->AttachCurrentThread((void**)&mJNIEnv, (void *)NULL); + } + + + listeners_t::iterator it = mListeners.find(aMessage.getModuleId()); + + if (it != mListeners.end()) + { + (it->second)->processMessage(aMessage, mJNIEnv); + } + else + { + if (mDefaultListener) + { + mDefaultListener->processMessage(aMessage, mJNIEnv); + } + else + { + WLOG3(EJavaComms, "CommsEndpoint::handleMessage: No handlers found, msgId=%d, moduleId=%d, messageRef=%d", + aMessage.getMessageId(), + aMessage.getModuleId(), + aMessage.getMessageRef()); + } + } + return 0; +} + +OS_EXPORT int CommsEndpoint::registerListener(int aModuleId, CommsListener* aListener) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + if (!aListener) return EINVAL; + + int rc = 0; + + if (mListeners.find(aModuleId) != mListeners.end()) + { + rc = EEXIST; // already exists + ELOG2(EJavaComms, "%s failed, listener already exists %d", __PRETTY_FUNCTION__, aModuleId); + } + else + { + mListeners.insert(std::make_pair(aModuleId, new ListenerWrapper(aListener))); + LOG1(EJavaComms, EInfo, "Registered listener for module id %d", aModuleId); + } + + return rc; +} + +OS_EXPORT int CommsEndpoint::unregisterListener(int aModuleId, CommsListener* /*aListener*/) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + int rc = 0; + + listeners_t::iterator it = mListeners.find(aModuleId); + if (it != mListeners.end() /*&& it->second == aListener*/) + { + delete it->second; + mListeners.erase(it); + LOG1(EJavaComms, EInfo, "Unregistered listener for module id %d", aModuleId); + } + else + { + rc = ENOENT; // not found + ELOG2(EJavaComms, "%s failed, listener does not exists %d", __PRETTY_FUNCTION__, aModuleId); + } + + return rc; +} + +OS_EXPORT int CommsEndpoint::registerDefaultListener(CommsListener* aListener) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + if (!aListener) return EINVAL; + + int rc = 0; + + if (!mDefaultListener) + { + mDefaultListener = new ListenerWrapper(aListener); + LOG(EJavaComms, EInfo, "Default listener registered"); + } + else + { + rc = EEXIST; // already exists + ELOG1(EJavaComms, "%s failed, default listener already exists", __PRETTY_FUNCTION__); + } + return rc; +} + +OS_EXPORT int CommsEndpoint::unregisterDefaultListener(CommsListener* aListener) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + + int rc = 0; + + if (mDefaultListener && mDefaultListener->getListener() == aListener) + { + delete mDefaultListener; + mDefaultListener = 0; + LOG(EJavaComms, EInfo, "Default listener unregistered"); + } + else + { + rc = ENOENT; // not found + ELOG2(EJavaComms, "%s failed, errno = %d", __PRETTY_FUNCTION__, rc); + } + + return rc; +} + +int CommsEndpoint::isSendReceiveMessage(CommsMessage& aMessage) +{ +// JELOG2(EJavaComms); + + pthread_mutex_lock(&mSendReceiveMutex); + + int handled = 0; + sendReceiveListeners_t::iterator it = mSendReceiveListeners.find(aMessage.getMessageRef()); + + if (it != mSendReceiveListeners.end()) + { + sendReceiveMessage_t sync = (it->second); + + *sync.p_message = aMessage; + *sync.p_done = 1; + *sync.p_result = 0; + + pthread_cond_broadcast(&mSendReceiveCondition); // wake sendReceive threads + handled = 1; + } + + pthread_mutex_unlock(&mSendReceiveMutex); + + return handled; +} + +OS_EXPORT int CommsEndpoint::sendReceive(CommsMessage& aMessage, CommsMessage& aReceivedMessage, int aTimeoutInSecs) +{ +#ifdef __SYMBIAN32__ + // Workaround for OpenC max timeout value limitation. + // If OpenC limit is exceeded then EINVAL would be returned from pthread_cond_timedwait() + const int MAX_TIME = 2148; + if (aTimeoutInSecs > MAX_TIME) + { + WLOG3(EJavaComms, "%s timeout value too big, changing %d to %d", __PRETTY_FUNCTION__, aTimeoutInSecs, MAX_TIME); + aTimeoutInSecs = MAX_TIME; + } +#endif // __SYMBIAN32__ + +// JELOG2(EJavaComms); + if (aTimeoutInSecs < 1 && aTimeoutInSecs != WAIT_FOR_EVER) + { + ELOG2(EJavaComms, "%s failed, invalid timeout value = %d", __PRETTY_FUNCTION__, aTimeoutInSecs); + return EINVAL; + } + + pthread_mutex_lock(&mSendReceiveMutex); + + int messageRef = generateMessageReference(); + aMessage.setMessageRef(messageRef); + int rc = send(aMessage); + + if (rc != 0) + { + pthread_mutex_unlock(&mSendReceiveMutex); + ELOG2(EJavaComms, "%s failed, errno = %d", __PRETTY_FUNCTION__, rc); + return rc; + } + + int done = 0; + int result = 0; + sendReceiveMessage_t msg; + msg.p_message = &aReceivedMessage; + msg.p_done = &done; + msg.p_result = &result; + + mSendReceiveListeners.insert(std::make_pair(messageRef, msg)); + + // specify duration for timeout + timeval curtime; + rc = gettimeofday(&curtime, 0); + + timespec timeout; + timeout.tv_sec = curtime.tv_sec; + timeout.tv_nsec = curtime.tv_usec * 1000; + timeout.tv_sec += aTimeoutInSecs; + + // loop used, since signal might stop the wait before the timeout + while (!done) + { + if (aTimeoutInSecs == WAIT_FOR_EVER) + { + rc = pthread_cond_wait(&mSendReceiveCondition, &mSendReceiveMutex); + } + else + { + rc = pthread_cond_timedwait(&mSendReceiveCondition, &mSendReceiveMutex, &timeout); + } + switch (rc) + { + case 0: // cond. variable being signaled + if (done) + { + mSendReceiveListeners.erase(messageRef); + rc = result; + } + break; + + case ETIMEDOUT: + mSendReceiveListeners.erase(messageRef); + done = 1; + break; + + default: + ELOG2(EJavaComms, "pthread_cond_timedwait failed %d - %s", rc, strerror(rc)); + break; + } + } + + if (rc) + { + ELOG3(EJavaComms, "%s failed %d - %s", __PRETTY_FUNCTION__ , rc, strerror(rc)); + } + + pthread_mutex_unlock(&mSendReceiveMutex); + return rc; +} + +OS_EXPORT int CommsEndpoint::generateMessageReference() +{ +// JELOG2(EJavaComms); + ScopedLock lock(mMessageRefMutex); + if (mMessageRefCounter == 0) + { + // valid msgRef can't be zero, CommsMessage used zero as default value + mMessageRefCounter++; + } + int messageRef = mMessageRefCounter++; + return messageRef; +} + +OS_EXPORT void CommsEndpoint::processMessage(const ipcMessage_t* aMsg) +{ + CommsMessage msg(aMsg); + handleIpcMessage(msg); +} + +OS_EXPORT void CommsEndpoint::onStart() +{ +} + +OS_EXPORT void CommsEndpoint::onExit() +{ + pthread_mutex_lock(&mSendReceiveMutex); + for (sendReceiveListeners_t::iterator iter = mSendReceiveListeners.begin(); iter != mSendReceiveListeners.end(); iter++) + { + sendReceiveMessage_t sync = (iter->second); + CommsMessage empty; + + *sync.p_message = empty; + *sync.p_done = 1; + *sync.p_result = EINTR; + } + + pthread_cond_broadcast(&mSendReceiveCondition); // wake sendReceive threads + pthread_mutex_unlock(&mSendReceiveMutex); + + // wait here until all sendReceive threads are done + while (true) + { + pthread_mutex_lock(&mSendReceiveMutex); + int listenerCount = mSendReceiveListeners.size(); + pthread_mutex_unlock(&mSendReceiveMutex); + + if (listenerCount!=0) + { + WLOG1(EJavaComms, "waiting for sendReceive listeners - count=%d", listenerCount); + usleep(50); // 0.05s + } + else + { + break; + } + } + + if (mVm) + { + mVm->DetachCurrentThread(); + mVm=0; + mJNIEnv=0; + } +} + +OS_EXPORT int CommsEndpoint::registerJavaListener(int aModuleId, jobject aListener, JNIEnv* aEnv) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + int rc = 0; + + if (mListeners.find(aModuleId) != mListeners.end()) + { + rc = EEXIST; // already exists + ELOG2(EJavaComms, "%s failed, listener already exists %d", __PRETTY_FUNCTION__, aModuleId); + } + else + { + ListenerWrapper* listener = new ListenerWrapper(aListener, aEnv); + mListeners.insert(std::make_pair(aModuleId, listener)); + LOG1(EJavaComms, EInfo, "Registered java listener for module id %d", aModuleId); + } + + return rc; +} + +OS_EXPORT int CommsEndpoint::unregisterJavaListener(int aModuleId, jobject, JNIEnv* aEnv) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + int rc = 0; + + listeners_t::iterator it = mListeners.find(aModuleId); + if (it != mListeners.end()) + { + (it->second)->release(aEnv); + delete it->second; + mListeners.erase(it); + LOG1(EJavaComms, EInfo, "Unregistered java listener for module id %d", aModuleId); + } + else + { + rc = ENOENT; // not found + ELOG2(EJavaComms, "%s failed, listener does not exists %d", __PRETTY_FUNCTION__, aModuleId); + } + + return rc; +} + +OS_EXPORT int CommsEndpoint::registerDefaultJavaListener(jobject aListener, JNIEnv* aEnv) +{ + ScopedLock lock(mListenersMutex); + int rc = 0; + + if (!mDefaultListener) + { + mDefaultListener = new ListenerWrapper(aListener, aEnv); + LOG(EJavaComms, EInfo, "Registered default java listener"); + } + else + { + rc = EEXIST; // already exists + ELOG1(EJavaComms, "%s failed, default listener already exists", __PRETTY_FUNCTION__); + } + return rc; +} + +OS_EXPORT int CommsEndpoint::unregisterDefaultJavaListener(jobject, JNIEnv*) +{ + ScopedLock lock(mListenersMutex); + int rc = 0; + + if (mDefaultListener && mDefaultListener->getListener() == 0) + { + delete mDefaultListener; + mDefaultListener = 0; + LOG(EJavaComms, EInfo, "Unregistered default java listener"); + } + else + { + rc = ENOENT; // not found + ELOG2(EJavaComms, "%s failed, errno = %d", __PRETTY_FUNCTION__, rc); + } + return rc; +} + + +OS_EXPORT int CommsEndpoint::attachToVm(JNIEnv* aEnv) +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + + int rc = aEnv->GetJavaVM(&mVm); + if (rc != 0) + { + ELOG2(EJavaComms, "%s failed, rc = %d", __PRETTY_FUNCTION__, rc); + } + + return rc; +} + +OS_EXPORT int CommsEndpoint::detachFromVm() +{ +// JELOG2(EJavaComms); + ScopedLock lock(mListenersMutex); + + // remove java listeners + if (mDefaultListener && mDefaultListener->getListener() == 0) + { + delete mDefaultListener; + mDefaultListener = 0; + LOG(EJavaComms, EInfo, "Removed default java listener (detach)"); + } + + for (listeners_t::iterator it = mListeners.begin(); it != mListeners.end();) + { + if ((it->second)->getListener() == 0) + { + LOG1(EJavaComms, EInfo, "Removed java listener for module id %d (detach)", it->first); + + (it->second)->release(mJNIEnv); + delete it->second; + mListeners.erase(it); + it = mListeners.begin(); + } + else + { + it++; + } + } + return 0; +} + +} // namespace comms +} // namespace java +