WebCore/websockets/WorkerThreadableWebSocketChannel.cpp
changeset 0 4f2f89ce4247
equal deleted inserted replaced
-1:000000000000 0:4f2f89ce4247
       
     1 /*
       
     2  * Copyright (C) 2009, 2010 Google Inc.  All rights reserved.
       
     3  *
       
     4  * Redistribution and use in source and binary forms, with or without
       
     5  * modification, are permitted provided that the following conditions are
       
     6  * met:
       
     7  *
       
     8  *     * Redistributions of source code must retain the above copyright
       
     9  * notice, this list of conditions and the following disclaimer.
       
    10  *     * Redistributions in binary form must reproduce the above
       
    11  * copyright notice, this list of conditions and the following disclaimer
       
    12  * in the documentation and/or other materials provided with the
       
    13  * distribution.
       
    14  *     * Neither the name of Google Inc. nor the names of its
       
    15  * contributors may be used to endorse or promote products derived from
       
    16  * this software without specific prior written permission.
       
    17  *
       
    18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
       
    19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
       
    20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
       
    21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
       
    22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
       
    23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
       
    24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
       
    25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
       
    26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
       
    27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
       
    28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
       
    29  */
       
    30 
       
    31 #include "config.h"
       
    32 
       
    33 #if ENABLE(WEB_SOCKETS) && ENABLE(WORKERS)
       
    34 
       
    35 #include "WorkerThreadableWebSocketChannel.h"
       
    36 
       
    37 #include "CrossThreadTask.h"
       
    38 #include "PlatformString.h"
       
    39 #include "ScriptExecutionContext.h"
       
    40 #include "ThreadableWebSocketChannelClientWrapper.h"
       
    41 #include "WebSocketChannel.h"
       
    42 #include "WebSocketChannelClient.h"
       
    43 #include "WorkerContext.h"
       
    44 #include "WorkerLoaderProxy.h"
       
    45 #include "WorkerRunLoop.h"
       
    46 #include "WorkerThread.h"
       
    47 
       
    48 #include <wtf/PassRefPtr.h>
       
    49 
       
    50 namespace WebCore {
       
    51 
       
    52 WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerContext* context, WebSocketChannelClient* client, const String& taskMode, const KURL& url, const String& protocol)
       
    53     : m_workerContext(context)
       
    54     , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(client))
       
    55     , m_bridge(Bridge::create(m_workerClientWrapper, m_workerContext, taskMode, url, protocol))
       
    56 {
       
    57 }
       
    58 
       
    59 WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel()
       
    60 {
       
    61     if (m_bridge)
       
    62         m_bridge->disconnect();
       
    63 }
       
    64 
       
    65 void WorkerThreadableWebSocketChannel::connect()
       
    66 {
       
    67     if (m_bridge)
       
    68         m_bridge->connect();
       
    69 }
       
    70 
       
    71 bool WorkerThreadableWebSocketChannel::send(const String& message)
       
    72 {
       
    73     if (!m_bridge)
       
    74         return false;
       
    75     return m_bridge->send(message);
       
    76 }
       
    77 
       
    78 unsigned long WorkerThreadableWebSocketChannel::bufferedAmount() const
       
    79 {
       
    80     if (!m_bridge)
       
    81         return 0;
       
    82     return m_bridge->bufferedAmount();
       
    83 }
       
    84 
       
    85 void WorkerThreadableWebSocketChannel::close()
       
    86 {
       
    87     if (m_bridge)
       
    88         m_bridge->close();
       
    89 }
       
    90 
       
    91 void WorkerThreadableWebSocketChannel::disconnect()
       
    92 {
       
    93     m_bridge->disconnect();
       
    94     m_bridge.clear();
       
    95 }
       
    96 
       
    97 void WorkerThreadableWebSocketChannel::suspend()
       
    98 {
       
    99     m_workerClientWrapper->suspend();
       
   100     if (m_bridge)
       
   101         m_bridge->suspend();
       
   102 }
       
   103 
       
   104 void WorkerThreadableWebSocketChannel::resume()
       
   105 {
       
   106     m_workerClientWrapper->resume();
       
   107     if (m_bridge)
       
   108         m_bridge->resume();
       
   109 }
       
   110 
       
   111 WorkerThreadableWebSocketChannel::Peer::Peer(RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext* context, const String& taskMode, const KURL& url, const String& protocol)
       
   112     : m_workerClientWrapper(clientWrapper)
       
   113     , m_loaderProxy(loaderProxy)
       
   114     , m_mainWebSocketChannel(WebSocketChannel::create(context, this, url, protocol))
       
   115     , m_taskMode(taskMode)
       
   116 {
       
   117     ASSERT(isMainThread());
       
   118 }
       
   119 
       
   120 WorkerThreadableWebSocketChannel::Peer::~Peer()
       
   121 {
       
   122     ASSERT(isMainThread());
       
   123     if (m_mainWebSocketChannel)
       
   124         m_mainWebSocketChannel->disconnect();
       
   125 }
       
   126 
       
   127 void WorkerThreadableWebSocketChannel::Peer::connect()
       
   128 {
       
   129     ASSERT(isMainThread());
       
   130     if (!m_mainWebSocketChannel)
       
   131         return;
       
   132     m_mainWebSocketChannel->connect();
       
   133 }
       
   134 
       
   135 static void workerContextDidSend(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, bool sent)
       
   136 {
       
   137     ASSERT_UNUSED(context, context->isWorkerContext());
       
   138     workerClientWrapper->setSent(sent);
       
   139 }
       
   140 
       
   141 void WorkerThreadableWebSocketChannel::Peer::send(const String& message)
       
   142 {
       
   143     ASSERT(isMainThread());
       
   144     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
       
   145         return;
       
   146     bool sent = m_mainWebSocketChannel->send(message);
       
   147     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidSend, m_workerClientWrapper, sent), m_taskMode);
       
   148 }
       
   149 
       
   150 static void workerContextDidGetBufferedAmount(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long bufferedAmount)
       
   151 {
       
   152     ASSERT_UNUSED(context, context->isWorkerContext());
       
   153     workerClientWrapper->setBufferedAmount(bufferedAmount);
       
   154 }
       
   155 
       
   156 void WorkerThreadableWebSocketChannel::Peer::bufferedAmount()
       
   157 {
       
   158     ASSERT(isMainThread());
       
   159     if (!m_mainWebSocketChannel || !m_workerClientWrapper)
       
   160         return;
       
   161     unsigned long bufferedAmount = m_mainWebSocketChannel->bufferedAmount();
       
   162     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidGetBufferedAmount, m_workerClientWrapper, bufferedAmount), m_taskMode);
       
   163 }
       
   164 
       
   165 void WorkerThreadableWebSocketChannel::Peer::close()
       
   166 {
       
   167     ASSERT(isMainThread());
       
   168     if (!m_mainWebSocketChannel)
       
   169         return;
       
   170     m_mainWebSocketChannel->close();
       
   171     m_mainWebSocketChannel = 0;
       
   172 }
       
   173 
       
   174 void WorkerThreadableWebSocketChannel::Peer::disconnect()
       
   175 {
       
   176     ASSERT(isMainThread());
       
   177     if (!m_mainWebSocketChannel)
       
   178         return;
       
   179     m_mainWebSocketChannel->disconnect();
       
   180     m_mainWebSocketChannel = 0;
       
   181 }
       
   182 
       
   183 void WorkerThreadableWebSocketChannel::Peer::suspend()
       
   184 {
       
   185     ASSERT(isMainThread());
       
   186     if (!m_mainWebSocketChannel)
       
   187         return;
       
   188     m_mainWebSocketChannel->suspend();
       
   189 }
       
   190 
       
   191 void WorkerThreadableWebSocketChannel::Peer::resume()
       
   192 {
       
   193     ASSERT(isMainThread());
       
   194     if (!m_mainWebSocketChannel)
       
   195         return;
       
   196     m_mainWebSocketChannel->resume();
       
   197 }
       
   198 
       
   199 static void workerContextDidConnect(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
       
   200 {
       
   201     ASSERT_UNUSED(context, context->isWorkerContext());
       
   202     workerClientWrapper->didConnect();
       
   203 }
       
   204 
       
   205 void WorkerThreadableWebSocketChannel::Peer::didConnect()
       
   206 {
       
   207     ASSERT(isMainThread());
       
   208     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidConnect, m_workerClientWrapper), m_taskMode);
       
   209 }
       
   210 
       
   211 static void workerContextDidReceiveMessage(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, const String& message)
       
   212 {
       
   213     ASSERT_UNUSED(context, context->isWorkerContext());
       
   214     workerClientWrapper->didReceiveMessage(message);
       
   215 }
       
   216 
       
   217 void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message)
       
   218 {
       
   219     ASSERT(isMainThread());
       
   220     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidReceiveMessage, m_workerClientWrapper, message), m_taskMode);
       
   221 }
       
   222 
       
   223 static void workerContextDidClose(ScriptExecutionContext* context, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, unsigned long unhandledBufferedAmount)
       
   224 {
       
   225     ASSERT_UNUSED(context, context->isWorkerContext());
       
   226     workerClientWrapper->didClose(unhandledBufferedAmount);
       
   227 }
       
   228 
       
   229 void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned long unhandledBufferedAmount)
       
   230 {
       
   231     ASSERT(isMainThread());
       
   232     m_mainWebSocketChannel = 0;
       
   233     m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&workerContextDidClose, m_workerClientWrapper, unhandledBufferedAmount), m_taskMode);
       
   234 }
       
   235 
       
   236 void WorkerThreadableWebSocketChannel::Bridge::setWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, Peer* peer, RefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper)
       
   237 {
       
   238     ASSERT_UNUSED(context, context->isWorkerContext());
       
   239     thisPtr->m_peer = peer;
       
   240     workerClientWrapper->setSyncMethodDone();
       
   241 }
       
   242 
       
   243 void WorkerThreadableWebSocketChannel::Bridge::mainThreadCreateWebSocketChannel(ScriptExecutionContext* context, Bridge* thisPtr, RefPtr<ThreadableWebSocketChannelClientWrapper> clientWrapper, const String& taskMode, const KURL& url, const String& protocol)
       
   244 {
       
   245     ASSERT(isMainThread());
       
   246     ASSERT_UNUSED(context, context->isDocument());
       
   247 
       
   248     Peer* peer = Peer::create(clientWrapper, thisPtr->m_loaderProxy, context, taskMode, url, protocol);
       
   249     thisPtr->m_loaderProxy.postTaskForModeToWorkerContext(createCallbackTask(&Bridge::setWebSocketChannel, thisPtr, peer, clientWrapper), taskMode);
       
   250 }
       
   251 
       
   252 WorkerThreadableWebSocketChannel::Bridge::Bridge(PassRefPtr<ThreadableWebSocketChannelClientWrapper> workerClientWrapper, PassRefPtr<WorkerContext> workerContext, const String& taskMode, const KURL& url, const String& protocol)
       
   253     : m_workerClientWrapper(workerClientWrapper)
       
   254     , m_workerContext(workerContext)
       
   255     , m_loaderProxy(m_workerContext->thread()->workerLoaderProxy())
       
   256     , m_taskMode(taskMode)
       
   257     , m_peer(0)
       
   258 {
       
   259     ASSERT(m_workerClientWrapper.get());
       
   260     setMethodNotCompleted();
       
   261     m_loaderProxy.postTaskToLoader(createCallbackTask(&Bridge::mainThreadCreateWebSocketChannel, this, m_workerClientWrapper, m_taskMode, url, protocol));
       
   262     waitForMethodCompletion();
       
   263     ASSERT(m_peer);
       
   264 }
       
   265 
       
   266 WorkerThreadableWebSocketChannel::Bridge::~Bridge()
       
   267 {
       
   268     disconnect();
       
   269 }
       
   270 
       
   271 void WorkerThreadableWebSocketChannel::mainThreadConnect(ScriptExecutionContext* context, Peer* peer)
       
   272 {
       
   273     ASSERT(isMainThread());
       
   274     ASSERT_UNUSED(context, context->isDocument());
       
   275     ASSERT(peer);
       
   276 
       
   277     peer->connect();
       
   278 }
       
   279 
       
   280 void WorkerThreadableWebSocketChannel::Bridge::connect()
       
   281 {
       
   282     ASSERT(m_workerClientWrapper);
       
   283     ASSERT(m_peer);
       
   284     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadConnect, m_peer));
       
   285 }
       
   286 
       
   287 void WorkerThreadableWebSocketChannel::mainThreadSend(ScriptExecutionContext* context, Peer* peer, const String& message)
       
   288 {
       
   289     ASSERT(isMainThread());
       
   290     ASSERT_UNUSED(context, context->isDocument());
       
   291     ASSERT(peer);
       
   292 
       
   293     peer->send(message);
       
   294 }
       
   295 
       
   296 bool WorkerThreadableWebSocketChannel::Bridge::send(const String& message)
       
   297 {
       
   298     if (!m_workerClientWrapper)
       
   299         return false;
       
   300     ASSERT(m_peer);
       
   301     setMethodNotCompleted();
       
   302     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSend, m_peer, message));
       
   303     RefPtr<Bridge> protect(this);
       
   304     waitForMethodCompletion();
       
   305     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
       
   306     return clientWrapper && clientWrapper->sent();
       
   307 }
       
   308 
       
   309 void WorkerThreadableWebSocketChannel::mainThreadBufferedAmount(ScriptExecutionContext* context, Peer* peer)
       
   310 {
       
   311     ASSERT(isMainThread());
       
   312     ASSERT_UNUSED(context, context->isDocument());
       
   313     ASSERT(peer);
       
   314 
       
   315     peer->bufferedAmount();
       
   316 }
       
   317 
       
   318 unsigned long WorkerThreadableWebSocketChannel::Bridge::bufferedAmount()
       
   319 {
       
   320     if (!m_workerClientWrapper)
       
   321         return 0;
       
   322     ASSERT(m_peer);
       
   323     setMethodNotCompleted();
       
   324     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadBufferedAmount, m_peer));
       
   325     RefPtr<Bridge> protect(this);
       
   326     waitForMethodCompletion();
       
   327     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
       
   328     if (clientWrapper)
       
   329         return clientWrapper->bufferedAmount();
       
   330     return 0;
       
   331 }
       
   332 
       
   333 void WorkerThreadableWebSocketChannel::mainThreadClose(ScriptExecutionContext* context, Peer* peer)
       
   334 {
       
   335     ASSERT(isMainThread());
       
   336     ASSERT_UNUSED(context, context->isDocument());
       
   337     ASSERT(peer);
       
   338 
       
   339     peer->close();
       
   340 }
       
   341 
       
   342 void WorkerThreadableWebSocketChannel::Bridge::close()
       
   343 {
       
   344     ASSERT(m_peer);
       
   345     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadClose, m_peer));
       
   346 }
       
   347 
       
   348 void WorkerThreadableWebSocketChannel::mainThreadDestroy(ScriptExecutionContext* context, Peer* peer)
       
   349 {
       
   350     ASSERT(isMainThread());
       
   351     ASSERT_UNUSED(context, context->isDocument());
       
   352     ASSERT(peer);
       
   353 
       
   354     delete peer;
       
   355 }
       
   356 
       
   357 void WorkerThreadableWebSocketChannel::Bridge::disconnect()
       
   358 {
       
   359     clearClientWrapper();
       
   360     if (m_peer) {
       
   361         Peer* peer = m_peer;
       
   362         m_peer = 0;
       
   363         m_loaderProxy.postTaskToLoader(createCallbackTask(&mainThreadDestroy, peer));
       
   364     }
       
   365     m_workerContext = 0;
       
   366 }
       
   367 
       
   368 void WorkerThreadableWebSocketChannel::mainThreadSuspend(ScriptExecutionContext* context, Peer* peer)
       
   369 {
       
   370     ASSERT(isMainThread());
       
   371     ASSERT_UNUSED(context, context->isDocument());
       
   372     ASSERT(peer);
       
   373 
       
   374     peer->suspend();
       
   375 }
       
   376 
       
   377 void WorkerThreadableWebSocketChannel::Bridge::suspend()
       
   378 {
       
   379     ASSERT(m_peer);
       
   380     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadSuspend, m_peer));
       
   381 }
       
   382 
       
   383 void WorkerThreadableWebSocketChannel::mainThreadResume(ScriptExecutionContext* context, Peer* peer)
       
   384 {
       
   385     ASSERT(isMainThread());
       
   386     ASSERT_UNUSED(context, context->isDocument());
       
   387     ASSERT(peer);
       
   388 
       
   389     peer->resume();
       
   390 }
       
   391 
       
   392 void WorkerThreadableWebSocketChannel::Bridge::resume()
       
   393 {
       
   394     ASSERT(m_peer);
       
   395     m_loaderProxy.postTaskToLoader(createCallbackTask(&WorkerThreadableWebSocketChannel::mainThreadResume, m_peer));
       
   396 }
       
   397 
       
   398 void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper()
       
   399 {
       
   400     m_workerClientWrapper->clearClient();
       
   401 }
       
   402 
       
   403 void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted()
       
   404 {
       
   405     ASSERT(m_workerClientWrapper);
       
   406     m_workerClientWrapper->clearSyncMethodDone();
       
   407 }
       
   408 
       
   409 // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end,
       
   410 // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference.
       
   411 void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion()
       
   412 {
       
   413     if (!m_workerContext)
       
   414         return;
       
   415     WorkerRunLoop& runLoop = m_workerContext->thread()->runLoop();
       
   416     MessageQueueWaitResult result = MessageQueueMessageReceived;
       
   417     ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.get();
       
   418     while (m_workerContext && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) {
       
   419         result = runLoop.runInMode(m_workerContext.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerContext become null.
       
   420         clientWrapper = m_workerClientWrapper.get();
       
   421     }
       
   422 }
       
   423 
       
   424 } // namespace WebCore
       
   425 
       
   426 #endif // ENABLE(WEB_SOCKETS)