|
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
|
2 // Use of this source code is governed by a BSD-style license that can be |
|
3 // found in the LICENSE file. |
|
4 |
|
5 package org.chromium.sdk.internal.transport; |
|
6 |
|
7 import java.io.BufferedReader; |
|
8 import java.io.BufferedWriter; |
|
9 import java.io.IOException; |
|
10 import java.io.InputStreamReader; |
|
11 import java.io.OutputStreamWriter; |
|
12 import java.io.Reader; |
|
13 import java.io.Writer; |
|
14 import java.net.Socket; |
|
15 import java.net.SocketAddress; |
|
16 import java.util.concurrent.BlockingQueue; |
|
17 import java.util.concurrent.LinkedBlockingQueue; |
|
18 import java.util.concurrent.atomic.AtomicBoolean; |
|
19 import java.util.logging.Level; |
|
20 import java.util.logging.Logger; |
|
21 |
|
22 import org.chromium.sdk.ConnectionLogger; |
|
23 import org.chromium.sdk.internal.transport.Message.MalformedMessageException; |
|
24 |
|
25 /** |
|
26 * The low-level network agent handling the reading and writing of Messages |
|
27 * using the debugger socket. |
|
28 * |
|
29 * This class is thread-safe. |
|
30 */ |
|
31 public class SocketConnection implements Connection { |
|
32 |
|
33 /** |
|
34 * A thread that can be gracefully interrupted by a third party. |
|
35 * <p> |
|
36 * Unfortunately there is no standard way of interrupting I/O in Java. See Bug #4514257 |
|
37 * on Java Bug Database (http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4514257). |
|
38 */ |
|
39 private static abstract class InterruptibleThread extends Thread { |
|
40 |
|
41 protected volatile boolean isTerminated = false; |
|
42 |
|
43 InterruptibleThread(String name) { |
|
44 super(name); |
|
45 } |
|
46 |
|
47 @Override |
|
48 public synchronized void start() { |
|
49 this.isTerminated = false; |
|
50 super.start(); |
|
51 } |
|
52 |
|
53 @Override |
|
54 public synchronized void interrupt() { |
|
55 this.isTerminated = true; |
|
56 super.interrupt(); |
|
57 } |
|
58 } |
|
59 |
|
60 /** |
|
61 * Character encoding used in the socket data interchange. |
|
62 */ |
|
63 private static final String SOCKET_CHARSET = "UTF-8"; |
|
64 |
|
65 /** |
|
66 * A thread writing client-supplied messages into the debugger socket. |
|
67 */ |
|
68 private class WriterThread extends InterruptibleThread { |
|
69 |
|
70 private final BufferedWriter writer; |
|
71 |
|
72 public WriterThread(BufferedWriter writer) { |
|
73 super("WriterThread"); |
|
74 this.writer = writer; |
|
75 } |
|
76 |
|
77 @Override |
|
78 public void run() { |
|
79 while (!isTerminated && isAttached.get()) { |
|
80 try { |
|
81 handleOutboundMessage(outboundQueue.take()); |
|
82 } catch (InterruptedException e) { |
|
83 // interrupt called on this thread, exit on isTerminated |
|
84 } |
|
85 } |
|
86 } |
|
87 |
|
88 private void handleOutboundMessage(Message message) { |
|
89 try { |
|
90 LOGGER.log(Level.FINER, "-->{0}", message); |
|
91 message.sendThrough(writer); |
|
92 } catch (IOException e) { |
|
93 SocketConnection.this.shutdown(e, false); |
|
94 } |
|
95 } |
|
96 } |
|
97 |
|
98 private static abstract class MessageItem { |
|
99 abstract void report(NetListener listener); |
|
100 abstract boolean isEos(); |
|
101 } |
|
102 private static final MessageItem EOS = new MessageItem() { |
|
103 @Override |
|
104 void report(NetListener listener) { |
|
105 LOGGER.log(Level.FINER, "<--EOS"); |
|
106 listener.eosReceived(); |
|
107 } |
|
108 @Override |
|
109 boolean isEos() { |
|
110 return true; |
|
111 } |
|
112 }; |
|
113 private static class RegularMessageItem extends MessageItem { |
|
114 private final Message message; |
|
115 RegularMessageItem(Message message) { |
|
116 this.message = message; |
|
117 } |
|
118 @Override |
|
119 void report(NetListener listener) { |
|
120 LOGGER.log(Level.FINER, "<--{0}", message); |
|
121 listener.messageReceived(message); |
|
122 } |
|
123 @Override |
|
124 boolean isEos() { |
|
125 return false; |
|
126 } |
|
127 } |
|
128 |
|
129 /** |
|
130 * A thread reading data from the debugger socket. |
|
131 */ |
|
132 private class ReaderThread extends InterruptibleThread { |
|
133 |
|
134 private final BufferedReader reader; |
|
135 private final Writer handshakeWriter; |
|
136 |
|
137 public ReaderThread(BufferedReader reader, Writer handshakeWriter) { |
|
138 super("ReaderThread"); |
|
139 this.reader = reader; |
|
140 this.handshakeWriter = handshakeWriter; |
|
141 } |
|
142 |
|
143 @Override |
|
144 public void run() { |
|
145 Exception breakException; |
|
146 try { |
|
147 /** The thread that dispatches the inbound messages (to avoid queue growth.) */ |
|
148 startResponseDispatcherThread(); |
|
149 |
|
150 if (connectionLogger != null) { |
|
151 connectionLogger.start(); |
|
152 } |
|
153 |
|
154 handshaker.perform(reader, handshakeWriter); |
|
155 |
|
156 startWriterThread(); |
|
157 |
|
158 while (!isTerminated && isAttached.get()) { |
|
159 Message message; |
|
160 try { |
|
161 message = Message.fromBufferedReader(reader); |
|
162 } catch (MalformedMessageException e) { |
|
163 LOGGER.log(Level.SEVERE, "Malformed protocol message", e); |
|
164 continue; |
|
165 } |
|
166 if (message == null) { |
|
167 LOGGER.fine("End of stream"); |
|
168 break; |
|
169 } |
|
170 inboundQueue.add(new RegularMessageItem(message)); |
|
171 } |
|
172 breakException = null; |
|
173 } catch (IOException e) { |
|
174 breakException = e; |
|
175 } finally { |
|
176 inboundQueue.add(EOS); |
|
177 } |
|
178 if (!isInterrupted()) { |
|
179 SocketConnection.this.shutdown(breakException, false); |
|
180 } |
|
181 } |
|
182 } |
|
183 |
|
184 /** |
|
185 * A thread dispatching V8 responses (to avoid locking the ReaderThread.) |
|
186 */ |
|
187 private class ResponseDispatcherThread extends Thread { |
|
188 |
|
189 public ResponseDispatcherThread() { |
|
190 super("ResponseDispatcherThread"); |
|
191 } |
|
192 |
|
193 @Override |
|
194 public void run() { |
|
195 MessageItem messageItem; |
|
196 try { |
|
197 while (true) { |
|
198 messageItem = inboundQueue.take(); |
|
199 try { |
|
200 messageItem.report(listener); |
|
201 } catch (Exception e) { |
|
202 LOGGER.log(Level.SEVERE, "Exception in message listener", e); |
|
203 } |
|
204 if (messageItem.isEos()) { |
|
205 if (connectionLogger != null) { |
|
206 connectionLogger.handleEos(); |
|
207 } |
|
208 break; |
|
209 } |
|
210 } |
|
211 } catch (InterruptedException e) { |
|
212 // terminate thread |
|
213 } |
|
214 } |
|
215 } |
|
216 |
|
217 /** The class logger. */ |
|
218 private static final Logger LOGGER = Logger.getLogger(SocketConnection.class.getName()); |
|
219 |
|
220 /** Lameduck shutdown delay in ms. */ |
|
221 private static final int LAMEDUCK_DELAY_MS = 1000; |
|
222 |
|
223 /** The input stream buffer size. */ |
|
224 private static final int INPUT_BUFFER_SIZE_BYTES = 65536; |
|
225 |
|
226 private static final NetListener NULL_LISTENER = new NetListener() { |
|
227 public void connectionClosed() { |
|
228 } |
|
229 |
|
230 public void eosReceived() { |
|
231 } |
|
232 |
|
233 public void messageReceived(Message message) { |
|
234 } |
|
235 }; |
|
236 |
|
237 /** Whether the agent is currently attached to a remote browser. */ |
|
238 private AtomicBoolean isAttached = new AtomicBoolean(false); |
|
239 |
|
240 /** The communication socket. */ |
|
241 protected Socket socket; |
|
242 |
|
243 /** The socket reader. */ |
|
244 protected BufferedReader reader; |
|
245 |
|
246 /** The socket writer. */ |
|
247 protected BufferedWriter writer; |
|
248 |
|
249 private final ConnectionLogger connectionLogger; |
|
250 |
|
251 /** Handshaker used to establish connection. */ |
|
252 private final Handshaker handshaker; |
|
253 |
|
254 /** The listener to report network events to. */ |
|
255 protected volatile NetListener listener; |
|
256 |
|
257 /** The inbound message queue. */ |
|
258 protected final BlockingQueue<MessageItem> inboundQueue = new LinkedBlockingQueue<MessageItem>(); |
|
259 |
|
260 /** The outbound message queue. */ |
|
261 protected final BlockingQueue<Message> outboundQueue = new LinkedBlockingQueue<Message>(); |
|
262 |
|
263 /** The socket endpoint. */ |
|
264 private final SocketAddress socketEndpoint; |
|
265 |
|
266 /** The thread that processes the outbound queue. */ |
|
267 private WriterThread writerThread; |
|
268 |
|
269 /** The thread that processes the inbound queue. */ |
|
270 private ReaderThread readerThread; |
|
271 |
|
272 /** Connection attempt timeout in ms. */ |
|
273 private final int connectionTimeoutMs; |
|
274 |
|
275 public SocketConnection(SocketAddress endpoint, int connectionTimeoutMs, |
|
276 ConnectionLogger connectionLogger, Handshaker handshaker) { |
|
277 this.socketEndpoint = endpoint; |
|
278 this.connectionTimeoutMs = connectionTimeoutMs; |
|
279 this.connectionLogger = connectionLogger; |
|
280 this.handshaker = handshaker; |
|
281 } |
|
282 |
|
283 void attach() throws IOException { |
|
284 this.socket = new Socket(); |
|
285 this.socket.connect(socketEndpoint, connectionTimeoutMs); |
|
286 Writer streamWriter = new OutputStreamWriter(socket.getOutputStream(), SOCKET_CHARSET); |
|
287 Reader streamReader = new InputStreamReader(socket.getInputStream(), SOCKET_CHARSET); |
|
288 |
|
289 if (connectionLogger != null) { |
|
290 streamWriter = connectionLogger.wrapWriter(streamWriter); |
|
291 streamReader = connectionLogger.wrapReader(streamReader); |
|
292 connectionLogger.setConnectionCloser(new ConnectionLogger.ConnectionCloser() { |
|
293 public void closeConnection() { |
|
294 close(); |
|
295 } |
|
296 }); |
|
297 } |
|
298 |
|
299 this.writer = new BufferedWriter(streamWriter); |
|
300 this.reader = new BufferedReader(streamReader, INPUT_BUFFER_SIZE_BYTES); |
|
301 isAttached.set(true); |
|
302 |
|
303 this.readerThread = new ReaderThread(reader, writer); |
|
304 // We do not start WriterThread until handshake is done (see ReaderThread) |
|
305 this.writerThread = null; |
|
306 readerThread.setDaemon(true); |
|
307 readerThread.start(); |
|
308 } |
|
309 |
|
310 void detach(boolean lameduckMode) { |
|
311 shutdown(null, lameduckMode); |
|
312 } |
|
313 |
|
314 void sendMessage(Message message) { |
|
315 outboundQueue.add(message); |
|
316 } |
|
317 |
|
318 private boolean isAttached() { |
|
319 return isAttached.get(); |
|
320 } |
|
321 |
|
322 /** |
|
323 * The method is synchronized so that it does not get called |
|
324 * from the {Reader,Writer}Thread when the underlying socket is |
|
325 * closed in another invocation of this method. |
|
326 */ |
|
327 private void shutdown(Exception cause, boolean lameduckMode) { |
|
328 if (!isAttached.compareAndSet(true, false)) { |
|
329 // already shut down |
|
330 return; |
|
331 } |
|
332 LOGGER.log(Level.INFO, "Shutdown requested", cause); |
|
333 |
|
334 if (lameduckMode) { |
|
335 Thread terminationThread = new Thread("ServiceThreadTerminator") { |
|
336 @Override |
|
337 public void run() { |
|
338 interruptServiceThreads(); |
|
339 } |
|
340 }; |
|
341 terminationThread.setDaemon(true); |
|
342 terminationThread.start(); |
|
343 try { |
|
344 terminationThread.join(LAMEDUCK_DELAY_MS); |
|
345 } catch (InterruptedException e) { |
|
346 // fall through |
|
347 } |
|
348 } else { |
|
349 interruptServiceThreads(); |
|
350 } |
|
351 |
|
352 try { |
|
353 socket.shutdownInput(); |
|
354 } catch (IOException e) { |
|
355 // ignore |
|
356 } |
|
357 try { |
|
358 socket.shutdownOutput(); |
|
359 } catch (IOException e) { |
|
360 // ignore |
|
361 } |
|
362 |
|
363 try { |
|
364 socket.close(); |
|
365 } catch (IOException e) { |
|
366 // ignore |
|
367 } |
|
368 listener.connectionClosed(); |
|
369 } |
|
370 |
|
371 private void interruptServiceThreads() { |
|
372 interruptThread(writerThread); |
|
373 interruptThread(readerThread); |
|
374 } |
|
375 |
|
376 private void startWriterThread() { |
|
377 if (writerThread != null) { |
|
378 throw new IllegalStateException(); |
|
379 } |
|
380 writerThread = new WriterThread(writer); |
|
381 writerThread.setDaemon(true); |
|
382 writerThread.start(); |
|
383 } |
|
384 |
|
385 private ResponseDispatcherThread startResponseDispatcherThread() { |
|
386 ResponseDispatcherThread dispatcherThread; |
|
387 dispatcherThread = new ResponseDispatcherThread(); |
|
388 dispatcherThread.setDaemon(true); |
|
389 dispatcherThread.start(); |
|
390 return dispatcherThread; |
|
391 } |
|
392 |
|
393 private void interruptThread(Thread thread) { |
|
394 try { |
|
395 if (thread != null) { |
|
396 thread.interrupt(); |
|
397 } |
|
398 } catch (SecurityException e) { |
|
399 // ignore |
|
400 } |
|
401 } |
|
402 |
|
403 public void close() { |
|
404 if (isAttached()) { |
|
405 detach(true); |
|
406 } |
|
407 } |
|
408 |
|
409 public boolean isConnected() { |
|
410 return isAttached(); |
|
411 } |
|
412 |
|
413 public void send(Message message) { |
|
414 checkAttached(); |
|
415 sendMessage(message); |
|
416 } |
|
417 |
|
418 public void setNetListener(NetListener netListener) { |
|
419 if (this.listener != null && netListener != this.listener) { |
|
420 throw new IllegalStateException("Cannot change NetListener"); |
|
421 } |
|
422 this.listener = netListener != null |
|
423 ? netListener |
|
424 : NULL_LISTENER; |
|
425 } |
|
426 |
|
427 public void start() throws IOException { |
|
428 try { |
|
429 if (!isAttached()) { |
|
430 attach(); |
|
431 } |
|
432 } catch (IOException e) { |
|
433 listener.connectionClosed(); |
|
434 throw e; |
|
435 } |
|
436 } |
|
437 |
|
438 private void checkAttached() { |
|
439 if (!isAttached()) { |
|
440 throw new IllegalStateException("Connection not attached"); |
|
441 } |
|
442 } |
|
443 } |