|
1 /* |
|
2 * Copyright (c) 2009 Nokia Corporation and/or its subsidiary(-ies). |
|
3 * All rights reserved. |
|
4 * This component and the accompanying materials are made available |
|
5 * under the terms of "Eclipse Public License v1.0" |
|
6 * which accompanies this distribution, and is available |
|
7 * at the URL "http://www.eclipse.org/legal/epl-v10.html". |
|
8 * |
|
9 * Initial Contributors: |
|
10 * Nokia Corporation - initial contribution. |
|
11 * |
|
12 * Contributors: |
|
13 * |
|
14 * Description: |
|
15 * This file contains the header file of the DataGatewaySocketWriterThread, |
|
16 * DataGatewaySocketReaderThread, DataGatewayClientThread and DataGateway |
|
17 * classes. |
|
18 */ |
|
19 |
|
20 // INCLUDES |
|
21 #include "datagateway.h" |
|
22 |
|
23 //********************************************************************************** |
|
24 // Class DataGatewaySocketWriterThread |
|
25 // |
|
26 // This thread is used to read Data from outgoing queue |
|
27 // and write it to Socket(connected to program using DataGateway) |
|
28 //********************************************************************************** |
|
29 |
|
30 DataGatewaySocketWriterThread::DataGatewaySocketWriterThread(SafeQueue<Data*>* q, |
|
31 Socket* s) |
|
32 : m_Running(true) |
|
33 { |
|
34 m_Queue = q; |
|
35 m_Socket = s; |
|
36 } |
|
37 |
|
38 /* |
|
39 * Main loop of thread |
|
40 * Reads Data from outgoing queue and writes it to Socket(connected to program using DataGateway) |
|
41 */ |
|
42 void DataGatewaySocketWriterThread::Run() |
|
43 { |
|
44 while (m_Running) |
|
45 { |
|
46 // Sending to TCP/IP port |
|
47 //Util::Debug("[DataGatewaySocketWriterThread] try to send"); |
|
48 try |
|
49 { |
|
50 Data* d = m_Queue->front(50); |
|
51 char* p = (char *)d->GetData(); |
|
52 //DWORD l = (DWORD)((((BYTE)p[g_HtiOffsetMessageSize] << 8) | (BYTE)p[g_HtiOffsetMessageSize+1])); |
|
53 DWORD l = d->GetLength(); |
|
54 |
|
55 if (Util::GetVerboseLevel() == Util::debug) |
|
56 { |
|
57 char tmp[64]; |
|
58 sprintf(tmp, "[DataGatewaySocketWriterThread] HTI MsgSize = %d", l); |
|
59 string s(tmp); |
|
60 Util::Debug(s); |
|
61 //Util::Hex(p, min(16,d->GetLength())); |
|
62 } |
|
63 |
|
64 //m_Socket->SendBytes((const unsigned char *)&p[g_HtiOffsetMessageData], l); |
|
65 m_Socket->SendBytes((const unsigned char *)p, l); |
|
66 Util::Debug("[DataGatewaySocketWriterThread] msg send"); |
|
67 m_Queue->pop(); |
|
68 delete d; |
|
69 d = NULL; |
|
70 |
|
71 } catch (TimeoutException te) |
|
72 { |
|
73 //Util::Debug("[DataGatewaySocketWriterThread]timeout exception"); |
|
74 } |
|
75 } |
|
76 } |
|
77 |
|
78 void DataGatewaySocketWriterThread::Stop() |
|
79 { |
|
80 m_Running = false; |
|
81 } |
|
82 |
|
83 bool DataGatewaySocketWriterThread::IsRunning() |
|
84 { |
|
85 return m_Running; |
|
86 } |
|
87 |
|
88 //********************************************************************************** |
|
89 // Class DataGatewaySocketReaderThread |
|
90 // |
|
91 // This thread is used to read incoming bytes from Socket(connected to program using DataGateway) |
|
92 // which it then encapsulates into Data objects and forwards to outgoing queue |
|
93 //********************************************************************************** |
|
94 |
|
95 DataGatewaySocketReaderThread::DataGatewaySocketReaderThread(SafeQueue<Data*>* q, |
|
96 long bufsize, |
|
97 Socket* s) |
|
98 : m_Running(true) |
|
99 { |
|
100 m_Queue = q; |
|
101 m_Socket = s; |
|
102 if (bufsize > 0) |
|
103 { |
|
104 m_TcpIpBufferSize = bufsize; |
|
105 } |
|
106 else |
|
107 { |
|
108 m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize; |
|
109 } |
|
110 } |
|
111 |
|
112 /* |
|
113 * Main loop of thread |
|
114 * Reads bytes from Socket(connected to program using DataGateway), encapsulates them to Data object and puts these to outgoing queue |
|
115 */ |
|
116 void DataGatewaySocketReaderThread::Run() |
|
117 { |
|
118 BYTE* buffer = new BYTE[m_TcpIpBufferSize]; |
|
119 while (m_Running) |
|
120 { |
|
121 // Reading from TCP/IP port |
|
122 Util::Debug("[DataGatewaySocketReaderThread] try to read"); |
|
123 int bytes_read = -1; |
|
124 bytes_read = m_Socket->ReceiveBytes(buffer, m_TcpIpBufferSize); |
|
125 if (bytes_read < 0) |
|
126 { |
|
127 Stop(); |
|
128 break; |
|
129 } |
|
130 if (bytes_read > 0) |
|
131 { |
|
132 Data* d = new Data((void *)buffer, bytes_read, Data::EData); |
|
133 if (Util::GetVerboseLevel() == Util::debug) |
|
134 { |
|
135 char tmp[64]; |
|
136 sprintf(tmp, "m_Socket->ReceiveBytes (%d (dec) bytes):", d->GetLength()); |
|
137 string s(tmp); |
|
138 Util::Debug(s); |
|
139 //Util::Hex(p, d->GetLength()); |
|
140 } |
|
141 |
|
142 m_Queue->push(d); |
|
143 d = NULL; |
|
144 } |
|
145 Sleep(50); |
|
146 } |
|
147 delete[] buffer; |
|
148 buffer = NULL; |
|
149 } |
|
150 |
|
151 void DataGatewaySocketReaderThread::Stop() |
|
152 { |
|
153 m_Running = false; |
|
154 } |
|
155 |
|
156 bool DataGatewaySocketReaderThread::IsRunning() |
|
157 { |
|
158 return m_Running; |
|
159 } |
|
160 |
|
161 //********************************************************************************** |
|
162 // Class DataGatewayClientThread |
|
163 // |
|
164 // This thread serves DataGateway's clients |
|
165 // Gets Data from incoming queue to which DataGatewaySocketReader has pushed it and forwards |
|
166 // them to CommChannelPlugin. |
|
167 // The thread also reads incoming data from CommChannelPlugin and forwards them to outgoing queue which |
|
168 // DataGatewaySocketWriter then reads |
|
169 //********************************************************************************** |
|
170 |
|
171 DataGatewayClientThread::DataGatewayClientThread(Socket** s, |
|
172 long bufsize, |
|
173 const string& commchannel) |
|
174 : m_ReaderThread(&m_ReaderQueue, bufsize, *s), |
|
175 m_WriterThread(&m_WriterQueue, *s), |
|
176 m_CommChannelPluginName(commchannel), |
|
177 m_Running(true) |
|
178 { |
|
179 m_CCLateInit = true; |
|
180 m_Socket = s; |
|
181 if (bufsize > 0) |
|
182 { |
|
183 m_TcpIpBufferSize = bufsize; |
|
184 } |
|
185 else |
|
186 { |
|
187 m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize; |
|
188 } |
|
189 } |
|
190 |
|
191 DataGatewayClientThread::DataGatewayClientThread(Socket** s, |
|
192 long bufsize, |
|
193 CommChannelPlugin** f) |
|
194 : m_ReaderThread(&m_ReaderQueue, bufsize, *s), |
|
195 m_WriterThread(&m_WriterQueue, *s), |
|
196 m_CommChannelPluginName((*f)->GetName()), |
|
197 m_Running(true) |
|
198 { |
|
199 m_CCLateInit = false; |
|
200 m_Socket = s; |
|
201 m_CommChannelPlugin = *f; |
|
202 if (bufsize > 0) |
|
203 { |
|
204 m_TcpIpBufferSize = bufsize; |
|
205 } |
|
206 else |
|
207 { |
|
208 m_TcpIpBufferSize = g_DataGatewayDefaultTcpIpBufferSize; |
|
209 } |
|
210 } |
|
211 |
|
212 DataGatewayClientThread::~DataGatewayClientThread() |
|
213 { |
|
214 Util::Debug("DataGatewayClientThread::~DataGatewayClientThread()"); |
|
215 if (m_Running) |
|
216 { |
|
217 Stop(); |
|
218 } |
|
219 } |
|
220 |
|
221 /* |
|
222 * Main loop of thread |
|
223 * Gets Data from incoming queue to which DataGatewaySocketReader has pushed it and forwards |
|
224 * them to CommChannelPlugin. |
|
225 * Reads incoming data from CommChannelPlugin and forwards them to outgoing queue which |
|
226 * DataGatewaySocketWriter then reads |
|
227 */ |
|
228 void DataGatewayClientThread::Run() |
|
229 { |
|
230 DWORD res; |
|
231 |
|
232 if (m_CCLateInit) |
|
233 { |
|
234 m_CommChannelPlugin = CommChannelPlugin::Instance(m_CommChannelPluginName); |
|
235 if (m_CommChannelPlugin == NULL) |
|
236 { |
|
237 g_ErrorCode = ERR_DG_COMMCHANNEL; |
|
238 return; |
|
239 } |
|
240 if ((res = m_CommChannelPlugin->Connect()) != NO_ERRORS) |
|
241 { |
|
242 Util::Error("[DataGateway] Error - Cannot connect to the target."); |
|
243 m_CommChannelPlugin->Disconnect(); |
|
244 g_ErrorCode = res; |
|
245 return; |
|
246 } |
|
247 Util::Info("[DataGateway] Communication Channel Plugin loaded succesfully"); |
|
248 } |
|
249 |
|
250 //Start DataGatewaySocketReader and DataGatewaySocketWriter threads |
|
251 m_ReaderThread.Start(); |
|
252 m_WriterThread.Start(); |
|
253 |
|
254 // Flush comm input buffer; |
|
255 Data* dummy; |
|
256 while (m_CommChannelPlugin->Receive(&dummy) == NO_ERRORS) continue; |
|
257 dummy = NULL; |
|
258 |
|
259 while (m_Running) |
|
260 { |
|
261 if (!m_ReaderThread.IsRunning() || |
|
262 !m_WriterThread.IsRunning()) |
|
263 { |
|
264 Stop(); |
|
265 break; |
|
266 } |
|
267 |
|
268 // Receiving from TCP/IP port and |
|
269 // sending to CommChannelPlugin |
|
270 try { |
|
271 Data* d = m_ReaderQueue.front(50); |
|
272 m_ReaderQueue.pop(); |
|
273 m_CommChannelPlugin->Send(d); |
|
274 d = NULL; |
|
275 } catch (TimeoutException te) {} |
|
276 |
|
277 // Receiving from CommChannelPlugin and |
|
278 // sending data to TCP/IP port. If message |
|
279 // is error or control message it is also |
|
280 // handled here. |
|
281 Data* out; |
|
282 |
|
283 if (m_CommChannelPlugin->Receive(&out) != NO_ERRORS) continue; |
|
284 |
|
285 switch (out->GetType()) |
|
286 { |
|
287 case Data::EData: |
|
288 { |
|
289 m_WriterQueue.push(out); |
|
290 out = NULL; |
|
291 } |
|
292 break; |
|
293 case Data::EControl: |
|
294 { |
|
295 Util::Debug("ClientThread: Control Message Received"); |
|
296 switch (*(BYTE*)out->GetData()) |
|
297 { |
|
298 case ControlPhonePowered: |
|
299 { |
|
300 Util::Info("[DataGateway] Phone powered up"); |
|
301 } |
|
302 break; |
|
303 } |
|
304 } |
|
305 break; |
|
306 case Data::EError: |
|
307 { |
|
308 Util::Debug("ClientThread: Error Message Received"); |
|
309 Stop(); |
|
310 } |
|
311 break; |
|
312 |
|
313 default: |
|
314 { |
|
315 Util::Debug("ClientThread: Unknown Message Received"); |
|
316 } |
|
317 break; |
|
318 } |
|
319 delete out; |
|
320 out = NULL; |
|
321 } |
|
322 delete *m_Socket; |
|
323 m_Socket = NULL; |
|
324 |
|
325 if (m_CCLateInit) |
|
326 { |
|
327 m_CommChannelPlugin->Disconnect(); |
|
328 Util::Info("[DataGateway] Communication Channel Plugin unloaded"); |
|
329 m_CommChannelPlugin = NULL; |
|
330 } |
|
331 } |
|
332 |
|
333 void DataGatewayClientThread::Stop() |
|
334 { |
|
335 m_Running = false; |
|
336 m_WriterThread.Stop(); |
|
337 m_ReaderThread.Stop(); |
|
338 HANDLE handles[2]; |
|
339 handles[0] = m_WriterThread.ThreadHandle(); |
|
340 handles[1] = m_ReaderThread.ThreadHandle(); |
|
341 WaitForMultipleObjects(2, handles, TRUE, g_MaximumShutdownWaitTime); |
|
342 } |
|
343 |
|
344 //********************************************************************************** |
|
345 // Class DataGateway |
|
346 // |
|
347 // This is the main thread of DataGateway |
|
348 //********************************************************************************** |
|
349 |
|
350 DataGateway::DataGateway(int port, |
|
351 long bufsize, |
|
352 const string& commchannel, |
|
353 bool stayalive, |
|
354 bool cclateinit) |
|
355 : m_TcpIpPort(port), |
|
356 m_TcpIpBufferSize(bufsize), |
|
357 m_CommChannelPluginName(commchannel), |
|
358 m_StayAlive(stayalive), |
|
359 m_CCLateInit(cclateinit), |
|
360 m_Running(true) |
|
361 { |
|
362 m_CommChannelPlugin = NULL; |
|
363 } |
|
364 |
|
365 /* |
|
366 * Main loop of DataGateway |
|
367 * This loop: |
|
368 * -creates instance of CommChannelPlugin if lateinit isn't set on |
|
369 * -starts listening to socket for incoming connections |
|
370 * -after connection has arrived starts DataGatewayClient thread to serve the client |
|
371 */ |
|
372 void DataGateway::Run() |
|
373 { |
|
374 DWORD res; |
|
375 try |
|
376 { |
|
377 if (Util::GetVerboseLevel() >= Util::info) |
|
378 { |
|
379 char tmp[256]; |
|
380 sprintf(tmp, "[DataGateway] Using TCP/IP port %d", m_TcpIpPort); |
|
381 string s(tmp); |
|
382 Util::Info(s); |
|
383 |
|
384 sprintf(tmp, "[DataGateway] TCP/IP receive buffer size is %d bytes", m_TcpIpBufferSize); |
|
385 s.assign(tmp); |
|
386 Util::Info(s); |
|
387 |
|
388 sprintf(tmp, "[DataGateway] Loading Communication Channel Plugin for [%s]", m_CommChannelPluginName.c_str()); |
|
389 s.assign(tmp); |
|
390 Util::Info(s); |
|
391 } |
|
392 |
|
393 |
|
394 //Util::Info("[DataGateway] TCP/IP port opened"); |
|
395 |
|
396 if (!m_CCLateInit) |
|
397 { |
|
398 //if not lateinit, CommChannelPlugin may be started |
|
399 m_CommChannelPlugin = CommChannelPlugin::Instance(m_CommChannelPluginName); |
|
400 if (m_CommChannelPlugin == NULL) |
|
401 { |
|
402 throw UtilError("[DataGateway] Error loading Communication Channel.", ERR_DG_COMMCHANNEL); |
|
403 } |
|
404 if ((res = m_CommChannelPlugin->Connect()) != NO_ERRORS) |
|
405 { |
|
406 m_CommChannelPlugin->Disconnect(); |
|
407 throw UtilError("[DataGateway] Error connecting to the target.", res); |
|
408 } |
|
409 Util::Info("[DataGateway] Communication Channel Plugin loaded succesfully"); |
|
410 } |
|
411 else |
|
412 { |
|
413 Util::Info("[DataGateway] Communication Channel Plugin uses late initialization."); |
|
414 } |
|
415 |
|
416 while (m_Running) |
|
417 { |
|
418 g_ErrorCode = NO_ERRORS; |
|
419 Util::Info("[DataGateway] Waiting connection"); |
|
420 Socket* s = new Socket(); |
|
421 SocketServer in; |
|
422 // will use the socket for listening and return a new socket when connection |
|
423 // is established |
|
424 String remoteHost( "[DataGateway] Connection established! Remote host : " ); |
|
425 in.Accept(s, m_TcpIpPort, 1, remoteHost); |
|
426 Util::Info(remoteHost); |
|
427 DataGatewayClientThread* client; |
|
428 if (m_CCLateInit) |
|
429 { |
|
430 // passes ownership of 's' |
|
431 client = new DataGatewayClientThread(&s, m_TcpIpBufferSize, m_CommChannelPluginName); |
|
432 } |
|
433 else |
|
434 { |
|
435 // passes ownership of 's' |
|
436 client = new DataGatewayClientThread(&s, m_TcpIpBufferSize, &m_CommChannelPlugin); |
|
437 } |
|
438 client->Start(); |
|
439 |
|
440 HANDLE handles[2]; |
|
441 handles[0] = client->ThreadHandle(); |
|
442 handles[1] = m_ShutdownEvent.EventHandle(); |
|
443 DWORD dwResult = WaitForMultipleObjects(2, handles, FALSE, INFINITE); |
|
444 switch (dwResult) |
|
445 { |
|
446 case WAIT_OBJECT_0 + 0: |
|
447 { |
|
448 Util::Debug("DataGateway::Run() Client thread stopped"); |
|
449 } |
|
450 break; |
|
451 case WAIT_OBJECT_0 + 1: |
|
452 { |
|
453 Util::Debug("DataGateway::Run() Request to shutdown"); |
|
454 client->Stop(); |
|
455 WaitForSingleObject(client->ThreadHandle(), g_MaximumShutdownWaitTime); |
|
456 } |
|
457 break; |
|
458 } |
|
459 Util::Info("[DataGateway] Connection closed."); |
|
460 delete client; |
|
461 client = NULL; |
|
462 |
|
463 if (!m_StayAlive) break; |
|
464 } |
|
465 if (!m_CCLateInit) |
|
466 { |
|
467 m_CommChannelPlugin->Disconnect(); |
|
468 Util::Info("[DataGateway] Communication Channel Plugin unloaded"); |
|
469 m_CommChannelPlugin = NULL; |
|
470 } |
|
471 } catch (char* s) { |
|
472 char tmp[64]; |
|
473 sprintf(tmp, "[DataGateway] Error opening TCP/IP port - %s", s); |
|
474 Util::Error(tmp); |
|
475 g_ErrorCode = ERR_DG_SOCKET; |
|
476 } catch (UtilError ue) { |
|
477 Util::Error(ue.iError, ue.iResult); |
|
478 g_ErrorCode = ue.iResult; |
|
479 } |
|
480 |
|
481 Util::Info("[DataGateway] Closed"); |
|
482 } |
|
483 |
|
484 void DataGateway::Stop() |
|
485 { |
|
486 m_Running = false; |
|
487 m_ShutdownEvent.Set(); |
|
488 } |
|
489 |
|
490 bool DataGateway::IsRunning() |
|
491 { |
|
492 return m_Running; |
|
493 } |
|
494 |
|
495 // End of the file |