1 /******************************************************************************* |
|
2 * Copyright (c) 2006, 2008 Wind River Systems and others. |
|
3 * All rights reserved. This program and the accompanying materials |
|
4 * are made available under the terms of the Eclipse Public License v1.0 |
|
5 * which accompanies this distribution, and is available at |
|
6 * http://www.eclipse.org/legal/epl-v10.html |
|
7 * |
|
8 * Contributors: |
|
9 * Wind River Systems - initial API and implementation |
|
10 *******************************************************************************/ |
|
11 package org.eclipse.cdt.examples.dsf.dataviewer.answers; |
|
12 |
|
13 import java.util.Collections; |
|
14 import java.util.HashSet; |
|
15 import java.util.Random; |
|
16 import java.util.Set; |
|
17 import java.util.concurrent.BlockingQueue; |
|
18 import java.util.concurrent.LinkedBlockingQueue; |
|
19 import java.util.concurrent.TimeUnit; |
|
20 import java.util.concurrent.atomic.AtomicBoolean; |
|
21 |
|
22 import org.eclipse.core.runtime.IStatus; |
|
23 import org.eclipse.core.runtime.ListenerList; |
|
24 import org.eclipse.core.runtime.Status; |
|
25 import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor; |
|
26 import org.eclipse.cdt.dsf.concurrent.RequestMonitor; |
|
27 import org.eclipse.cdt.examples.dsf.DsfExamplesPlugin; |
|
28 |
|
29 /** |
|
30 * Thread-based implementation of the data generator. |
|
31 * <p> |
|
32 * This generator is based around a queue of client requests and a thread which |
|
33 * reads the requests from the queue and processes them. The distinguishing |
|
34 * feature of this generator is that it uses a a blocking queue as the main |
|
35 * synchronization object. However, fListeners, fShutdown, and fChangedIndexes |
|
36 * fields also need to be thread-safe and so they implement their own |
|
37 * synchronization. |
|
38 * </p> |
|
39 */ |
|
40 public class DataGeneratorWithThread extends Thread implements IDataGenerator { |
|
41 |
|
42 // Request objects are used to serialize the interface calls into objects |
|
43 // which can then be pushed into a queue. |
|
44 abstract class Request { |
|
45 final RequestMonitor fRequestMonitor; |
|
46 |
|
47 Request(RequestMonitor rm) { |
|
48 fRequestMonitor = rm; |
|
49 } |
|
50 } |
|
51 |
|
52 class CountRequest extends Request { |
|
53 CountRequest(DataRequestMonitor<Integer> rm) { |
|
54 super(rm); |
|
55 } |
|
56 } |
|
57 |
|
58 class ItemRequest extends Request { |
|
59 final int fIndex; |
|
60 ItemRequest(int index, DataRequestMonitor<String> rm) { |
|
61 super(rm); |
|
62 fIndex = index; |
|
63 } |
|
64 } |
|
65 |
|
66 class ShutdownRequest extends Request { |
|
67 ShutdownRequest(RequestMonitor rm) { |
|
68 super(rm); |
|
69 } |
|
70 } |
|
71 |
|
72 // Main request queue of the data generator. The getValue(), getCount(), |
|
73 // and shutdown() methods write into the queue, while the run() method |
|
74 // reads from it. |
|
75 private final BlockingQueue<Request> fQueue = new LinkedBlockingQueue<Request>(); |
|
76 |
|
77 // ListenerList class provides thread safety. |
|
78 private ListenerList fListeners = new ListenerList(); |
|
79 |
|
80 // Current number of elements in this generator. |
|
81 private int fCount = MIN_COUNT; |
|
82 |
|
83 // Counter used to determine when to reset the element count. |
|
84 private int fCountResetTrigger = 0; |
|
85 |
|
86 // Elements which were modified since the last reset. |
|
87 private Set<Integer> fChangedIndexes = Collections.synchronizedSet(new HashSet<Integer>()); |
|
88 |
|
89 // Used to determine when to make changes in data. |
|
90 private long fLastChangeTime = System.currentTimeMillis(); |
|
91 |
|
92 // Flag indicating when the generator has been shut down. |
|
93 private AtomicBoolean fShutdown = new AtomicBoolean(false); |
|
94 |
|
95 public DataGeneratorWithThread() { |
|
96 // Immediately kick off the request processing thread. |
|
97 start(); |
|
98 } |
|
99 |
|
100 public void shutdown(RequestMonitor rm) { |
|
101 // Mark the generator as shut down. After the fShutdown flag is set, |
|
102 // all new requests should be shut down. |
|
103 if (!fShutdown.getAndSet(true)) { |
|
104 fQueue.add(new ShutdownRequest(rm)); |
|
105 } else { |
|
106 // |
|
107 rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); |
|
108 rm.done(); |
|
109 } |
|
110 } |
|
111 |
|
112 public void getCount(DataRequestMonitor<Integer> rm) { |
|
113 if (!fShutdown.get()) { |
|
114 fQueue.add(new CountRequest(rm)); |
|
115 } else { |
|
116 rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); |
|
117 rm.done(); |
|
118 } |
|
119 } |
|
120 |
|
121 public void getValue(int index, DataRequestMonitor<String> rm) { |
|
122 if (!fShutdown.get()) { |
|
123 fQueue.add(new ItemRequest(index, rm)); |
|
124 } else { |
|
125 rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down")); |
|
126 rm.done(); |
|
127 } |
|
128 } |
|
129 |
|
130 public void addListener(Listener listener) { |
|
131 fListeners.add(listener); |
|
132 } |
|
133 |
|
134 public void removeListener(Listener listener) { |
|
135 fListeners.remove(listener); |
|
136 } |
|
137 |
|
138 @Override |
|
139 public void run() { |
|
140 try { |
|
141 while(true) { |
|
142 // Get the next request from the queue. The time-out |
|
143 // ensures that that the random changes get processed. |
|
144 final Request request = fQueue.poll(100, TimeUnit.MILLISECONDS); |
|
145 |
|
146 // If a request was dequeued, process it. |
|
147 if (request != null) { |
|
148 // Simulate a processing delay. |
|
149 Thread.sleep(PROCESSING_DELAY); |
|
150 |
|
151 if (request instanceof CountRequest) { |
|
152 processCountRequest((CountRequest)request); |
|
153 } else if (request instanceof ItemRequest) { |
|
154 processItemRequest((ItemRequest)request); |
|
155 } else if (request instanceof ShutdownRequest) { |
|
156 // If shutting down, just break out of the while(true) |
|
157 // loop and thread will exit. |
|
158 request.fRequestMonitor.done(); |
|
159 break; |
|
160 } |
|
161 } |
|
162 |
|
163 // Simulate data changes. |
|
164 randomChanges(); |
|
165 } |
|
166 } |
|
167 catch (InterruptedException x) {} |
|
168 } |
|
169 |
|
170 private void processCountRequest(CountRequest request) { |
|
171 @SuppressWarnings("unchecked") // Suppress warning about lost type info. |
|
172 DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor; |
|
173 |
|
174 rm.setData(fCount); |
|
175 rm.done(); |
|
176 } |
|
177 |
|
178 private void processItemRequest(ItemRequest request) { |
|
179 @SuppressWarnings("unchecked") // Suppress warning about lost type info. |
|
180 DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor; |
|
181 |
|
182 if (fChangedIndexes.contains(request.fIndex)) { |
|
183 rm.setData("Changed: " + request.fIndex); |
|
184 } else { |
|
185 rm.setData(Integer.toString(request.fIndex)); |
|
186 } |
|
187 rm.done(); |
|
188 } |
|
189 |
|
190 |
|
191 private void randomChanges() { |
|
192 // Check if enough time is elapsed. |
|
193 if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_INTERVAL) { |
|
194 fLastChangeTime = System.currentTimeMillis(); |
|
195 |
|
196 // Once every number of changes, reset the count, the rest of the |
|
197 // times just change certain values. |
|
198 if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0){ |
|
199 randomCountReset(); |
|
200 } else { |
|
201 randomDataChange(); |
|
202 } |
|
203 } |
|
204 } |
|
205 |
|
206 private void randomCountReset() { |
|
207 // Calculate the new count. |
|
208 Random random = new java.util.Random(); |
|
209 fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT); |
|
210 |
|
211 // Reset the changed values. |
|
212 fChangedIndexes.clear(); |
|
213 |
|
214 // Notify listeners |
|
215 for (Object listener : fListeners.getListeners()) { |
|
216 ((Listener)listener).countChanged(); |
|
217 } |
|
218 } |
|
219 |
|
220 private void randomDataChange() { |
|
221 // Calculate the indexes to change. |
|
222 Random random = new java.util.Random(); |
|
223 Set<Integer> set = new HashSet<Integer>(); |
|
224 for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) { |
|
225 set.add( new Integer(Math.abs(random.nextInt()) % fCount) ); |
|
226 } |
|
227 |
|
228 // Add the indexes to an overall set of changed indexes. |
|
229 fChangedIndexes.addAll(set); |
|
230 |
|
231 // Notify listeners |
|
232 for (Object listener : fListeners.getListeners()) { |
|
233 ((Listener)listener).valuesChanged(set); |
|
234 } |
|
235 } |
|
236 } |
|
237 |
|
238 |
|