javacommons/comms/src/commsendpoint.cpp
changeset 21 2a9601315dfc
child 50 023eef975703
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/javacommons/comms/src/commsendpoint.cpp	Mon May 03 12:27:20 2010 +0300
@@ -0,0 +1,512 @@
+/*
+* Copyright (c) 2008 Nokia Corporation and/or its subsidiary(-ies).
+* All rights reserved.
+* This component and the accompanying materials are made available
+* under the terms of "Eclipse Public License v1.0"
+* which accompanies this distribution, and is available
+* at the URL "http://www.eclipse.org/legal/epl-v10.html".
+*
+* Initial Contributors:
+* Nokia Corporation - initial contribution.
+*
+* Contributors:
+*
+* Description:  This is an abstract base class for Comms endpoints.
+*
+*/
+
+#include <errno.h>
+#include <utility>
+#include <sys/time.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "logger.h"
+
+#include "commsendpoint.h"
+#include "commslistener.h"
+#include "commsmessage.h"
+
+#include "listeners.h"
+
+namespace java
+{
+namespace comms
+{
+using java::util::ScopedLock;
+
+OS_EXPORT CommsEndpoint::CommsEndpoint() : mDefaultListener(0), mMessageRefCounter(0), mVm(0), mJNIEnv(0)
+{
+//    JELOG2(EJavaComms);
+    pthread_mutex_init(&mSendReceiveMutex, 0);
+    pthread_cond_init(&mSendReceiveCondition, 0);
+}
+
+OS_EXPORT CommsEndpoint::~CommsEndpoint()
+{
+//    JELOG2(EJavaComms);
+
+    for (listeners_t::iterator it = mListeners.begin(); it != mListeners.end(); it++)
+    {
+        delete it->second;
+    }
+    mListeners.clear();
+
+    delete mDefaultListener;
+
+    pthread_mutex_destroy(&mSendReceiveMutex);
+    pthread_cond_destroy(&mSendReceiveCondition);
+}
+
+OS_EXPORT void CommsEndpoint::handleIpcMessage(CommsMessage& aMessage)
+{
+//    JELOG2(EJavaComms);
+
+    /* Message routing is done in following steps:
+     *  1. check if received message is reply to sendReceive method call
+     *  2. use moduleId to find correct listener
+     * */
+    if (! isSendReceiveMessage(aMessage))
+    {
+        handleMessage(aMessage);
+    }
+}
+
+
+OS_EXPORT int CommsEndpoint::handleMessage(CommsMessage& aMessage)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+
+
+    if (mVm && !mJNIEnv)
+    {
+        // attach to VM
+        mVm->AttachCurrentThread((void**)&mJNIEnv, (void *)NULL);
+    }
+
+
+    listeners_t::iterator it = mListeners.find(aMessage.getModuleId());
+
+    if (it != mListeners.end())
+    {
+        (it->second)->processMessage(aMessage, mJNIEnv);
+    }
+    else
+    {
+        if (mDefaultListener)
+        {
+            mDefaultListener->processMessage(aMessage, mJNIEnv);
+        }
+        else
+        {
+            WLOG3(EJavaComms, "CommsEndpoint::handleMessage: No handlers found, msgId=%d, moduleId=%d, messageRef=%d",
+                  aMessage.getMessageId(),
+                  aMessage.getModuleId(),
+                  aMessage.getMessageRef());
+        }
+    }
+    return 0;
+}
+
+OS_EXPORT int CommsEndpoint::registerListener(int aModuleId, CommsListener* aListener)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+    if (!aListener) return EINVAL;
+
+    int rc = 0;
+
+    if (mListeners.find(aModuleId) != mListeners.end())
+    {
+        rc = EEXIST; // already exists
+        ELOG2(EJavaComms, "%s failed, listener already exists %d", __PRETTY_FUNCTION__, aModuleId);
+    }
+    else
+    {
+        mListeners.insert(std::make_pair(aModuleId, new ListenerWrapper(aListener)));
+        LOG1(EJavaComms, EInfo, "Registered listener for module id %d", aModuleId);
+    }
+
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::unregisterListener(int aModuleId, CommsListener* /*aListener*/)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+    int rc = 0;
+
+    listeners_t::iterator it = mListeners.find(aModuleId);
+    if (it != mListeners.end() /*&& it->second == aListener*/)
+    {
+        delete it->second;
+        mListeners.erase(it);
+        LOG1(EJavaComms, EInfo, "Unregistered listener for module id %d", aModuleId);
+    }
+    else
+    {
+        rc = ENOENT; // not found
+        ELOG2(EJavaComms, "%s failed, listener does not exists %d", __PRETTY_FUNCTION__, aModuleId);
+    }
+
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::registerDefaultListener(CommsListener* aListener)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+    if (!aListener) return EINVAL;
+
+    int rc = 0;
+
+    if (!mDefaultListener)
+    {
+        mDefaultListener = new ListenerWrapper(aListener);
+        LOG(EJavaComms, EInfo, "Default listener registered");
+    }
+    else
+    {
+        rc = EEXIST; // already exists
+        ELOG1(EJavaComms, "%s failed, default listener already exists", __PRETTY_FUNCTION__);
+    }
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::unregisterDefaultListener(CommsListener* aListener)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+
+    int rc = 0;
+
+    if (mDefaultListener && mDefaultListener->getListener() == aListener)
+    {
+        delete mDefaultListener;
+        mDefaultListener = 0;
+        LOG(EJavaComms, EInfo, "Default listener unregistered");
+    }
+    else
+    {
+        rc = ENOENT; // not found
+        ELOG2(EJavaComms, "%s failed, errno = %d", __PRETTY_FUNCTION__, rc);
+    }
+
+    return rc;
+}
+
+int CommsEndpoint::isSendReceiveMessage(CommsMessage& aMessage)
+{
+//    JELOG2(EJavaComms);
+
+    pthread_mutex_lock(&mSendReceiveMutex);
+
+    int handled = 0;
+    sendReceiveListeners_t::iterator it = mSendReceiveListeners.find(aMessage.getMessageRef());
+
+    if (it != mSendReceiveListeners.end())
+    {
+        sendReceiveMessage_t sync = (it->second);
+
+        *sync.p_message = aMessage;
+        *sync.p_done = 1;
+        *sync.p_result = 0;
+
+        pthread_cond_broadcast(&mSendReceiveCondition); // wake sendReceive threads
+        handled = 1;
+    }
+
+    pthread_mutex_unlock(&mSendReceiveMutex);
+
+    return handled;
+}
+
+OS_EXPORT int CommsEndpoint::sendReceive(CommsMessage& aMessage, CommsMessage& aReceivedMessage, int aTimeoutInSecs)
+{
+#ifdef __SYMBIAN32__
+    // Workaround for OpenC max timeout value limitation.
+    // If OpenC limit is exceeded then EINVAL would be returned from pthread_cond_timedwait()
+    const int MAX_TIME = 2148;
+    if (aTimeoutInSecs > MAX_TIME)
+    {
+        WLOG3(EJavaComms, "%s timeout value too big, changing %d to %d", __PRETTY_FUNCTION__, aTimeoutInSecs, MAX_TIME);
+        aTimeoutInSecs = MAX_TIME;
+    }
+#endif // __SYMBIAN32__
+
+//    JELOG2(EJavaComms);
+    if (aTimeoutInSecs < 1 && aTimeoutInSecs != WAIT_FOR_EVER)
+    {
+        ELOG2(EJavaComms, "%s failed, invalid timeout value = %d", __PRETTY_FUNCTION__, aTimeoutInSecs);
+        return EINVAL;
+    }
+
+    pthread_mutex_lock(&mSendReceiveMutex);
+
+    int messageRef = generateMessageReference();
+    aMessage.setMessageRef(messageRef);
+    int rc = send(aMessage);
+
+    if (rc != 0)
+    {
+        pthread_mutex_unlock(&mSendReceiveMutex);
+        ELOG2(EJavaComms, "%s failed, errno = %d", __PRETTY_FUNCTION__, rc);
+        return rc;
+    }
+
+    int done = 0;
+    int result = 0;
+    sendReceiveMessage_t msg;
+    msg.p_message   = &aReceivedMessage;
+    msg.p_done      = &done;
+    msg.p_result    = &result;
+
+    mSendReceiveListeners.insert(std::make_pair(messageRef, msg));
+
+    // specify duration for timeout
+    timeval  curtime;
+    rc = gettimeofday(&curtime, 0);
+
+    timespec timeout;
+    timeout.tv_sec  = curtime.tv_sec;
+    timeout.tv_nsec = curtime.tv_usec * 1000;
+    timeout.tv_sec += aTimeoutInSecs;
+
+    // loop used, since signal might stop the wait before the timeout
+    while (!done)
+    {
+        if (aTimeoutInSecs == WAIT_FOR_EVER)
+        {
+            rc = pthread_cond_wait(&mSendReceiveCondition, &mSendReceiveMutex);
+        }
+        else
+        {
+            rc = pthread_cond_timedwait(&mSendReceiveCondition, &mSendReceiveMutex, &timeout);
+        }
+        switch (rc)
+        {
+        case 0: // cond. variable being signaled
+            if (done)
+            {
+                mSendReceiveListeners.erase(messageRef);
+                rc = result;
+            }
+            break;
+
+        case ETIMEDOUT:
+            mSendReceiveListeners.erase(messageRef);
+            done = 1;
+            break;
+
+        default:
+            ELOG2(EJavaComms, "pthread_cond_timedwait failed %d - %s", rc, strerror(rc));
+            break;
+        }
+    }
+
+    if (rc)
+    {
+        ELOG3(EJavaComms, "%s failed %d - %s", __PRETTY_FUNCTION__ , rc, strerror(rc));
+    }
+
+    pthread_mutex_unlock(&mSendReceiveMutex);
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::generateMessageReference()
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mMessageRefMutex);
+    if (mMessageRefCounter == 0)
+    {
+        // valid msgRef can't be zero, CommsMessage used zero as default value
+        mMessageRefCounter++;
+    }
+    int messageRef = mMessageRefCounter++;
+    return messageRef;
+}
+
+OS_EXPORT void CommsEndpoint::processMessage(const ipcMessage_t* aMsg)
+{
+    CommsMessage msg(aMsg);
+    handleIpcMessage(msg);
+}
+
+OS_EXPORT void CommsEndpoint::onStart()
+{
+}
+
+OS_EXPORT void CommsEndpoint::onExit()
+{
+    pthread_mutex_lock(&mSendReceiveMutex);
+    for (sendReceiveListeners_t::iterator iter = mSendReceiveListeners.begin(); iter != mSendReceiveListeners.end(); iter++)
+    {
+        sendReceiveMessage_t sync = (iter->second);
+        CommsMessage empty;
+
+        *sync.p_message = empty;
+        *sync.p_done = 1;
+        *sync.p_result = EINTR;
+    }
+
+    pthread_cond_broadcast(&mSendReceiveCondition); // wake sendReceive threads
+    pthread_mutex_unlock(&mSendReceiveMutex);
+
+    // wait here until all sendReceive threads are done
+    while (true)
+    {
+        pthread_mutex_lock(&mSendReceiveMutex);
+        int listenerCount = mSendReceiveListeners.size();
+        pthread_mutex_unlock(&mSendReceiveMutex);
+
+        if (listenerCount!=0)
+        {
+            WLOG1(EJavaComms, "waiting for sendReceive listeners - count=%d", listenerCount);
+            usleep(50); // 0.05s
+        }
+        else
+        {
+            break;
+        }
+    }
+
+    if (mVm)
+    {
+        mVm->DetachCurrentThread();
+        mVm=0;
+        mJNIEnv=0;
+    }
+}
+
+OS_EXPORT int CommsEndpoint::registerJavaListener(int aModuleId, jobject aListener, JNIEnv* aEnv)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+    int rc = 0;
+
+    if (mListeners.find(aModuleId) != mListeners.end())
+    {
+        rc = EEXIST; // already exists
+        ELOG2(EJavaComms, "%s failed, listener already exists %d", __PRETTY_FUNCTION__, aModuleId);
+    }
+    else
+    {
+        ListenerWrapper* listener = new ListenerWrapper(aListener, aEnv);
+        mListeners.insert(std::make_pair(aModuleId, listener));
+        LOG1(EJavaComms, EInfo, "Registered java listener for module id %d", aModuleId);
+    }
+
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::unregisterJavaListener(int aModuleId, jobject, JNIEnv* aEnv)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+    int rc = 0;
+
+    listeners_t::iterator it = mListeners.find(aModuleId);
+    if (it != mListeners.end())
+    {
+        (it->second)->release(aEnv);
+        delete it->second;
+        mListeners.erase(it);
+        LOG1(EJavaComms, EInfo, "Unregistered java listener for module id %d", aModuleId);
+    }
+    else
+    {
+        rc = ENOENT; // not found
+        ELOG2(EJavaComms, "%s failed, listener does not exists %d", __PRETTY_FUNCTION__, aModuleId);
+    }
+
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::registerDefaultJavaListener(jobject aListener, JNIEnv* aEnv)
+{
+    ScopedLock lock(mListenersMutex);
+    int rc = 0;
+
+    if (!mDefaultListener)
+    {
+        mDefaultListener = new ListenerWrapper(aListener, aEnv);
+        LOG(EJavaComms, EInfo, "Registered default java listener");
+    }
+    else
+    {
+        rc = EEXIST; // already exists
+        ELOG1(EJavaComms, "%s failed, default listener already exists", __PRETTY_FUNCTION__);
+    }
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::unregisterDefaultJavaListener(jobject, JNIEnv*)
+{
+    ScopedLock lock(mListenersMutex);
+    int rc = 0;
+
+    if (mDefaultListener && mDefaultListener->getListener() == 0)
+    {
+        delete mDefaultListener;
+        mDefaultListener = 0;
+        LOG(EJavaComms, EInfo, "Unregistered default java listener");
+    }
+    else
+    {
+        rc = ENOENT; // not found
+        ELOG2(EJavaComms, "%s failed, errno = %d", __PRETTY_FUNCTION__, rc);
+    }
+    return rc;
+}
+
+
+OS_EXPORT int CommsEndpoint::attachToVm(JNIEnv* aEnv)
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+
+    int rc = aEnv->GetJavaVM(&mVm);
+    if (rc != 0)
+    {
+        ELOG2(EJavaComms, "%s failed, rc = %d", __PRETTY_FUNCTION__, rc);
+    }
+
+    return rc;
+}
+
+OS_EXPORT int CommsEndpoint::detachFromVm()
+{
+//    JELOG2(EJavaComms);
+    ScopedLock lock(mListenersMutex);
+
+    // remove java listeners
+    if (mDefaultListener && mDefaultListener->getListener() == 0)
+    {
+        delete mDefaultListener;
+        mDefaultListener = 0;
+        LOG(EJavaComms, EInfo, "Removed default java listener (detach)");
+    }
+
+    for (listeners_t::iterator it = mListeners.begin(); it != mListeners.end();)
+    {
+        if ((it->second)->getListener() == 0)
+        {
+            LOG1(EJavaComms, EInfo, "Removed java listener for module id %d (detach)", it->first);
+
+            (it->second)->release(mJNIEnv);
+            delete it->second;
+            mListeners.erase(it);
+            it = mListeners.begin();
+        }
+        else
+        {
+            it++;
+        }
+    }
+    return 0;
+}
+
+} // namespace comms
+} // namespace java
+