diff -r f5050f1da672 -r 04becd199f91 javacommons/comms/ipclib/socket/src/serversocketconnection.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/javacommons/comms/ipclib/socket/src/serversocketconnection.cpp Tue Apr 27 16:30:29 2010 +0300 @@ -0,0 +1,251 @@ +/* +* Copyright (c) 2008 Nokia Corporation and/or its subsidiary(-ies). +* All rights reserved. +* This component and the accompanying materials are made available +* under the terms of "Eclipse Public License v1.0" +* which accompanies this distribution, and is available +* at the URL "http://www.eclipse.org/legal/epl-v10.html". +* +* Initial Contributors: +* Nokia Corporation - initial contribution. +* +* Contributors: +* +* Description: ?Description +* +*/ + +#include +#include +#include +#include + +#include "logger.h" + +#include "serversocketconnection.h" +#include "socketconnection.h" + +const unsigned int INVALID_THREAD_ID = 0; + +namespace java +{ +namespace comms +{ +using java::util::ScopedLock; + +ServerSocketConnection::ServerSocketConnection(IpcListener* aListener) + : mThreadId(INVALID_THREAD_ID), mListener(aListener), mKeepRunning(0), mPort(0) +{ + /* + * The SIGPIPE signal will be received if the peer has gone away + * and an attempt is made to write data to the peer. Ignoring this + * signal causes the write operation to receive an EPIPE error. + * Thus, the user is informed about what happened. + */ +#ifndef __SYMBIAN32__ + signal(SIGPIPE, SIG_IGN); +#endif +} + +ServerSocketConnection::~ServerSocketConnection() +{ +} + +int ServerSocketConnection::start(int aPort) +{ + if (mThreadId != INVALID_THREAD_ID) return 0; + mPort = aPort; + int rc = mListenSocket.open(aPort); + + if (!rc) + { + mKeepRunning = 1; + rc = pthread_create(&mThreadId, 0, ServerSocketConnection::messageLoop, this); + if (rc!=0) + { + mThreadId = INVALID_THREAD_ID; + ELOG1(EJavaComms, "ServerSocketConnection::start(): pthread_create failed, errno = %d", rc); + } + } + + return rc; +} + +void* ServerSocketConnection::messageLoop(void* params) +{ + ServerSocketConnection* me = reinterpret_cast(params); + me->mListener->onStart(); + LOG1(EJavaComms, EInfo, "Server started on %d", me->mPort); + me->select(); + me->removeConnections(); + LOG1(EJavaComms, EInfo, "Server stopped on %d", me->mPort); + me->mListener->onExit(); + return 0; +} + + +void ServerSocketConnection::stop() +{ + mKeepRunning = 0; + + if (mThreadId != INVALID_THREAD_ID) + { + // kick select out from wait + Socket s; + s.open(mPort); + s.close(); + + int rc = pthread_join(mThreadId, 0); + if (rc) + { + ELOG1(EJavaComms, "ServerSocketConnection::stop(): pthread_join failed, errno = %d", rc); + } + } + + mListenSocket.close(); + removeConnections(); + mThreadId = INVALID_THREAD_ID; + mPort = 0; +} + +int ServerSocketConnection::select() +{ + while (mKeepRunning) + { + fd_set socks; + int maxfd = 0; + + createSelectSet(socks, maxfd); + + timeval tv; + tv.tv_sec = 600; + tv.tv_usec = 0; + + int rc = ::select(maxfd + 1, &socks, 0, 0, &tv); + + if (rc == 0) + { +// LOG(EJavaComms, EInfo, "select() timed out!"); + continue; + } + else if (rc < 0 && errno != EINTR) + { + ELOG1(EJavaComms, "Error in select(): %s", strerror(errno)); + break; + } + else if (rc > 0) + { + rc = handleNewConnection(socks); + if (rc<0) break; // bail out if accept fails + handleRead(socks); + } + } + return 0; +} + +int ServerSocketConnection::handleNewConnection(fd_set& aSet) +{ + int peersock = 0; + if (FD_ISSET(mListenSocket.getSocket(), &aSet)) + { + peersock = mListenSocket.accept(); + if (peersock < 0) + { + ELOG1(EJavaComms, "Error in accept(): %s", strerror(errno)); + } + else + { + ScopedLock lock(mSocketsMutex); + SocketConnection* con = new SocketConnection(mListener, peersock); + mSockets.push_back(con); + LOG3(EJavaComms, EInfo, "Server %d - client %d connected - clients %d", + mPort, con->getSocket(), mSockets.size()); + } + } + return peersock; +} + + +void ServerSocketConnection::handleRead(fd_set& aSet) +{ + ScopedLock lock(mSocketsMutex); + + for (connections_t::iterator it = mSockets.begin(); it != mSockets.end();) + { + if (FD_ISSET((*it)->getSocket(), &aSet)) + { + int rc = (*it)->receive(); + + if (!rc) + { + ++it; + } + else + { + LOG3(EJavaComms, EInfo, "Server %d - client %d disconnected - clients %d", + mPort, (*it)->getSocket(), mSockets.size()-1); + (*it)->disconnect(); + delete(*it); + it = mSockets.erase(it); + } + } + else + { + ++it; + } + } +} + +void ServerSocketConnection::createSelectSet(fd_set& aSet, int& aMaxFd) +{ + ScopedLock lock(mSocketsMutex); + + FD_ZERO(&aSet); + aMaxFd = mListenSocket.getSocket(); + + // add listen socket + FD_SET(mListenSocket.getSocket() ,&aSet); + + // add read sockets + for (connections_t::iterator it = mSockets.begin(); it != mSockets.end(); ++it) + { + int sock_fd = (*it)->getSocket(); + FD_SET(sock_fd, &aSet); + + aMaxFd = (aMaxFd < sock_fd)?sock_fd:aMaxFd; + } +} + +int ServerSocketConnection::send(ipcMessage_t* aMsg) +{ + ScopedLock lock(mSocketsMutex); + + int rc = EINVAL; + for (connections_t::iterator it = mSockets.begin(); it != mSockets.end(); ++it) + { + if (aMsg->ipcHeader.receiver == (*it)->getSocket()) + { + rc = (*it)->send(aMsg); + break; + } + } + return rc; +} + +void ServerSocketConnection::removeConnections() +{ + ScopedLock lock(mSocketsMutex); + + for (connections_t::iterator it = mSockets.begin(); it != mSockets.end();) + { + (*it)->disconnect(); + LOG3(EJavaComms, EInfo, "Server %d stopping - client %d disconnected - clients %d", + mPort, (*it)->getSocket(), mSockets.size()-1); + delete(*it); + it = mSockets.erase(it); + } +} + +} // namespace comms +} // namespace java +