org.chromium.sdk/src/org/chromium/sdk/internal/transport/SocketConnection.java
changeset 2 e4420d2515f1
child 52 f577ea64429e
equal deleted inserted replaced
1:ef76fc2ac88c 2:e4420d2515f1
       
     1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
       
     2 // Use of this source code is governed by a BSD-style license that can be
       
     3 // found in the LICENSE file.
       
     4 
       
     5 package org.chromium.sdk.internal.transport;
       
     6 
       
     7 import java.io.BufferedReader;
       
     8 import java.io.BufferedWriter;
       
     9 import java.io.IOException;
       
    10 import java.io.InputStreamReader;
       
    11 import java.io.OutputStreamWriter;
       
    12 import java.io.Reader;
       
    13 import java.io.Writer;
       
    14 import java.net.Socket;
       
    15 import java.net.SocketAddress;
       
    16 import java.util.concurrent.BlockingQueue;
       
    17 import java.util.concurrent.LinkedBlockingQueue;
       
    18 import java.util.concurrent.atomic.AtomicBoolean;
       
    19 import java.util.logging.Level;
       
    20 import java.util.logging.Logger;
       
    21 
       
    22 import org.chromium.sdk.ConnectionLogger;
       
    23 import org.chromium.sdk.internal.transport.Message.MalformedMessageException;
       
    24 
       
    25 /**
       
    26  * The low-level network agent handling the reading and writing of Messages
       
    27  * using the debugger socket.
       
    28  *
       
    29  * This class is thread-safe.
       
    30  */
       
    31 public class SocketConnection implements Connection {
       
    32 
       
    33   /**
       
    34    * A thread that can be gracefully interrupted by a third party.
       
    35    * <p>
       
    36    * Unfortunately there is no standard way of interrupting I/O in Java. See Bug #4514257
       
    37    * on Java Bug Database (http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4514257).
       
    38    */
       
    39   private static abstract class InterruptibleThread extends Thread {
       
    40 
       
    41     protected volatile boolean isTerminated = false;
       
    42 
       
    43     InterruptibleThread(String name) {
       
    44       super(name);
       
    45     }
       
    46 
       
    47     @Override
       
    48     public synchronized void start() {
       
    49       this.isTerminated = false;
       
    50       super.start();
       
    51     }
       
    52 
       
    53     @Override
       
    54     public synchronized void interrupt() {
       
    55       this.isTerminated = true;
       
    56       super.interrupt();
       
    57     }
       
    58   }
       
    59 
       
    60   /**
       
    61    * Character encoding used in the socket data interchange.
       
    62    */
       
    63   private static final String SOCKET_CHARSET = "UTF-8";
       
    64 
       
    65   /**
       
    66    * A thread writing client-supplied messages into the debugger socket.
       
    67    */
       
    68   private class WriterThread extends InterruptibleThread {
       
    69 
       
    70     private final BufferedWriter writer;
       
    71 
       
    72     public WriterThread(BufferedWriter writer) {
       
    73       super("WriterThread");
       
    74       this.writer = writer;
       
    75     }
       
    76 
       
    77     @Override
       
    78     public void run() {
       
    79       while (!isTerminated && isAttached.get()) {
       
    80         try {
       
    81           handleOutboundMessage(outboundQueue.take());
       
    82         } catch (InterruptedException e) {
       
    83           // interrupt called on this thread, exit on isTerminated
       
    84         }
       
    85       }
       
    86     }
       
    87 
       
    88     private void handleOutboundMessage(Message message) {
       
    89       try {
       
    90         LOGGER.log(Level.FINER, "-->{0}", message);
       
    91         message.sendThrough(writer);
       
    92       } catch (IOException e) {
       
    93         SocketConnection.this.shutdown(e, false);
       
    94       }
       
    95     }
       
    96   }
       
    97 
       
    98   private static abstract class MessageItem {
       
    99     abstract void report(NetListener listener);
       
   100     abstract boolean isEos();
       
   101   }
       
   102   private static final MessageItem EOS = new MessageItem() {
       
   103     @Override
       
   104     void report(NetListener listener) {
       
   105       LOGGER.log(Level.FINER, "<--EOS");
       
   106       listener.eosReceived();
       
   107     }
       
   108     @Override
       
   109     boolean isEos() {
       
   110       return true;
       
   111     }
       
   112   };
       
   113   private static class RegularMessageItem extends MessageItem {
       
   114     private final Message message;
       
   115     RegularMessageItem(Message message) {
       
   116       this.message = message;
       
   117     }
       
   118     @Override
       
   119     void report(NetListener listener) {
       
   120       LOGGER.log(Level.FINER, "<--{0}", message);
       
   121       listener.messageReceived(message);
       
   122     }
       
   123     @Override
       
   124     boolean isEos() {
       
   125       return false;
       
   126     }
       
   127   }
       
   128 
       
   129   /**
       
   130    * A thread reading data from the debugger socket.
       
   131    */
       
   132   private class ReaderThread extends InterruptibleThread {
       
   133 
       
   134     private final BufferedReader reader;
       
   135     private final Writer handshakeWriter;
       
   136 
       
   137     public ReaderThread(BufferedReader reader, Writer handshakeWriter) {
       
   138       super("ReaderThread");
       
   139       this.reader = reader;
       
   140       this.handshakeWriter = handshakeWriter;
       
   141     }
       
   142 
       
   143     @Override
       
   144     public void run() {
       
   145       Exception breakException;
       
   146       try {
       
   147         /** The thread that dispatches the inbound messages (to avoid queue growth.) */
       
   148         startResponseDispatcherThread();
       
   149 
       
   150         if (connectionLogger != null) {
       
   151           connectionLogger.start();
       
   152         }
       
   153 
       
   154         handshaker.perform(reader, handshakeWriter);
       
   155 
       
   156         startWriterThread();
       
   157 
       
   158         while (!isTerminated && isAttached.get()) {
       
   159           Message message;
       
   160           try {
       
   161             message = Message.fromBufferedReader(reader);
       
   162           } catch (MalformedMessageException e) {
       
   163             LOGGER.log(Level.SEVERE, "Malformed protocol message", e);
       
   164             continue;
       
   165           }
       
   166           if (message == null) {
       
   167             LOGGER.fine("End of stream");
       
   168             break;
       
   169           }
       
   170           inboundQueue.add(new RegularMessageItem(message));
       
   171         }
       
   172         breakException = null;
       
   173       } catch (IOException e) {
       
   174         breakException = e;
       
   175       } finally {
       
   176         inboundQueue.add(EOS);
       
   177       }
       
   178       if (!isInterrupted()) {
       
   179         SocketConnection.this.shutdown(breakException, false);
       
   180       }
       
   181     }
       
   182   }
       
   183 
       
   184   /**
       
   185    * A thread dispatching V8 responses (to avoid locking the ReaderThread.)
       
   186    */
       
   187   private class ResponseDispatcherThread extends Thread {
       
   188 
       
   189     public ResponseDispatcherThread() {
       
   190       super("ResponseDispatcherThread");
       
   191     }
       
   192 
       
   193     @Override
       
   194     public void run() {
       
   195       MessageItem messageItem;
       
   196       try {
       
   197         while (true) {
       
   198           messageItem = inboundQueue.take();
       
   199           try {
       
   200             messageItem.report(listener);
       
   201           } catch (Exception e) {
       
   202             LOGGER.log(Level.SEVERE, "Exception in message listener", e);
       
   203           }
       
   204           if (messageItem.isEos()) {
       
   205             if (connectionLogger != null) {
       
   206               connectionLogger.handleEos();
       
   207             }
       
   208             break;
       
   209           }
       
   210         }
       
   211       } catch (InterruptedException e) {
       
   212         // terminate thread
       
   213       }
       
   214     }
       
   215   }
       
   216 
       
   217   /** The class logger. */
       
   218   private static final Logger LOGGER = Logger.getLogger(SocketConnection.class.getName());
       
   219 
       
   220   /** Lameduck shutdown delay in ms. */
       
   221   private static final int LAMEDUCK_DELAY_MS = 1000;
       
   222 
       
   223   /** The input stream buffer size. */
       
   224   private static final int INPUT_BUFFER_SIZE_BYTES = 65536;
       
   225 
       
   226   private static final NetListener NULL_LISTENER = new NetListener() {
       
   227     public void connectionClosed() {
       
   228     }
       
   229 
       
   230     public void eosReceived() {
       
   231     }
       
   232 
       
   233     public void messageReceived(Message message) {
       
   234     }
       
   235   };
       
   236 
       
   237   /** Whether the agent is currently attached to a remote browser. */
       
   238   private AtomicBoolean isAttached = new AtomicBoolean(false);
       
   239 
       
   240   /** The communication socket. */
       
   241   protected Socket socket;
       
   242 
       
   243   /** The socket reader. */
       
   244   protected BufferedReader reader;
       
   245 
       
   246   /** The socket writer. */
       
   247   protected BufferedWriter writer;
       
   248 
       
   249   private final ConnectionLogger connectionLogger;
       
   250 
       
   251   /** Handshaker used to establish connection. */
       
   252   private final Handshaker handshaker;
       
   253 
       
   254   /** The listener to report network events to. */
       
   255   protected volatile NetListener listener;
       
   256 
       
   257   /** The inbound message queue. */
       
   258   protected final BlockingQueue<MessageItem> inboundQueue = new LinkedBlockingQueue<MessageItem>();
       
   259 
       
   260   /** The outbound message queue. */
       
   261   protected final BlockingQueue<Message> outboundQueue = new LinkedBlockingQueue<Message>();
       
   262 
       
   263   /** The socket endpoint. */
       
   264   private final SocketAddress socketEndpoint;
       
   265 
       
   266   /** The thread that processes the outbound queue. */
       
   267   private WriterThread writerThread;
       
   268 
       
   269   /** The thread that processes the inbound queue. */
       
   270   private ReaderThread readerThread;
       
   271 
       
   272   /** Connection attempt timeout in ms. */
       
   273   private final int connectionTimeoutMs;
       
   274 
       
   275   public SocketConnection(SocketAddress endpoint, int connectionTimeoutMs,
       
   276       ConnectionLogger connectionLogger, Handshaker handshaker) {
       
   277     this.socketEndpoint = endpoint;
       
   278     this.connectionTimeoutMs = connectionTimeoutMs;
       
   279     this.connectionLogger = connectionLogger;
       
   280     this.handshaker = handshaker;
       
   281   }
       
   282 
       
   283   void attach() throws IOException {
       
   284     this.socket = new Socket();
       
   285     this.socket.connect(socketEndpoint, connectionTimeoutMs);
       
   286     Writer streamWriter = new OutputStreamWriter(socket.getOutputStream(), SOCKET_CHARSET);
       
   287     Reader streamReader = new InputStreamReader(socket.getInputStream(), SOCKET_CHARSET);
       
   288 
       
   289     if (connectionLogger != null) {
       
   290       streamWriter = connectionLogger.wrapWriter(streamWriter);
       
   291       streamReader = connectionLogger.wrapReader(streamReader);
       
   292       connectionLogger.setConnectionCloser(new ConnectionLogger.ConnectionCloser() {
       
   293         public void closeConnection() {
       
   294           close();
       
   295         }
       
   296       });
       
   297     }
       
   298 
       
   299     this.writer = new BufferedWriter(streamWriter);
       
   300     this.reader = new BufferedReader(streamReader, INPUT_BUFFER_SIZE_BYTES);
       
   301     isAttached.set(true);
       
   302 
       
   303     this.readerThread = new ReaderThread(reader, writer);
       
   304     // We do not start WriterThread until handshake is done (see ReaderThread)
       
   305     this.writerThread = null;
       
   306     readerThread.setDaemon(true);
       
   307     readerThread.start();
       
   308   }
       
   309 
       
   310   void detach(boolean lameduckMode) {
       
   311     shutdown(null, lameduckMode);
       
   312   }
       
   313 
       
   314   void sendMessage(Message message) {
       
   315     outboundQueue.add(message);
       
   316   }
       
   317 
       
   318   private boolean isAttached() {
       
   319     return isAttached.get();
       
   320   }
       
   321 
       
   322   /**
       
   323    * The method is synchronized so that it does not get called
       
   324    * from the {Reader,Writer}Thread when the underlying socket is
       
   325    * closed in another invocation of this method.
       
   326    */
       
   327   private void shutdown(Exception cause, boolean lameduckMode) {
       
   328     if (!isAttached.compareAndSet(true, false)) {
       
   329       // already shut down
       
   330       return;
       
   331     }
       
   332     LOGGER.log(Level.INFO, "Shutdown requested", cause);
       
   333 
       
   334     if (lameduckMode) {
       
   335       Thread terminationThread = new Thread("ServiceThreadTerminator") {
       
   336         @Override
       
   337         public void run() {
       
   338           interruptServiceThreads();
       
   339         }
       
   340       };
       
   341       terminationThread.setDaemon(true);
       
   342       terminationThread.start();
       
   343       try {
       
   344         terminationThread.join(LAMEDUCK_DELAY_MS);
       
   345       } catch (InterruptedException e) {
       
   346         // fall through
       
   347       }
       
   348     } else {
       
   349       interruptServiceThreads();
       
   350     }
       
   351 
       
   352     try {
       
   353       socket.shutdownInput();
       
   354     } catch (IOException e) {
       
   355       // ignore
       
   356     }
       
   357     try {
       
   358       socket.shutdownOutput();
       
   359     } catch (IOException e) {
       
   360       // ignore
       
   361     }
       
   362 
       
   363     try {
       
   364       socket.close();
       
   365     } catch (IOException e) {
       
   366       // ignore
       
   367     }
       
   368     listener.connectionClosed();
       
   369   }
       
   370 
       
   371   private void interruptServiceThreads() {
       
   372     interruptThread(writerThread);
       
   373     interruptThread(readerThread);
       
   374   }
       
   375 
       
   376   private void startWriterThread() {
       
   377     if (writerThread != null) {
       
   378       throw new IllegalStateException();
       
   379     }
       
   380     writerThread = new WriterThread(writer);
       
   381     writerThread.setDaemon(true);
       
   382     writerThread.start();
       
   383   }
       
   384 
       
   385   private ResponseDispatcherThread startResponseDispatcherThread() {
       
   386     ResponseDispatcherThread dispatcherThread;
       
   387     dispatcherThread = new ResponseDispatcherThread();
       
   388     dispatcherThread.setDaemon(true);
       
   389     dispatcherThread.start();
       
   390     return dispatcherThread;
       
   391   }
       
   392 
       
   393   private void interruptThread(Thread thread) {
       
   394     try {
       
   395       if (thread != null) {
       
   396         thread.interrupt();
       
   397       }
       
   398     } catch (SecurityException e) {
       
   399       // ignore
       
   400     }
       
   401   }
       
   402 
       
   403   public void close() {
       
   404     if (isAttached()) {
       
   405       detach(true);
       
   406     }
       
   407   }
       
   408 
       
   409   public boolean isConnected() {
       
   410     return isAttached();
       
   411   }
       
   412 
       
   413   public void send(Message message) {
       
   414     checkAttached();
       
   415     sendMessage(message);
       
   416   }
       
   417 
       
   418   public void setNetListener(NetListener netListener) {
       
   419     if (this.listener != null && netListener != this.listener) {
       
   420       throw new IllegalStateException("Cannot change NetListener");
       
   421     }
       
   422     this.listener = netListener != null
       
   423         ? netListener
       
   424         : NULL_LISTENER;
       
   425   }
       
   426 
       
   427   public void start() throws IOException {
       
   428     try {
       
   429       if (!isAttached()) {
       
   430         attach();
       
   431       }
       
   432     } catch (IOException e) {
       
   433       listener.connectionClosed();
       
   434       throw e;
       
   435     }
       
   436   }
       
   437 
       
   438   private void checkAttached() {
       
   439     if (!isAttached()) {
       
   440       throw new IllegalStateException("Connection not attached");
       
   441     }
       
   442   }
       
   443 }