cdt/cdt_6_0_x/org.eclipse.cdt.examples.dsf/src/org/eclipse/cdt/examples/dsf/dataviewer/answers/DataGeneratorWithThread.java
changeset 51 49c226a8748e
parent 50 fcb77f9783d2
child 52 42077b7eab6e
equal deleted inserted replaced
50:fcb77f9783d2 51:49c226a8748e
     1 /*******************************************************************************
       
     2  * Copyright (c) 2006, 2008 Wind River Systems and others.
       
     3  * All rights reserved. This program and the accompanying materials
       
     4  * are made available under the terms of the Eclipse Public License v1.0
       
     5  * which accompanies this distribution, and is available at
       
     6  * http://www.eclipse.org/legal/epl-v10.html
       
     7  * 
       
     8  * Contributors:
       
     9  *     Wind River Systems - initial API and implementation
       
    10  *******************************************************************************/
       
    11 package org.eclipse.cdt.examples.dsf.dataviewer.answers;
       
    12 
       
    13 import java.util.Collections;
       
    14 import java.util.HashSet;
       
    15 import java.util.Random;
       
    16 import java.util.Set;
       
    17 import java.util.concurrent.BlockingQueue;
       
    18 import java.util.concurrent.LinkedBlockingQueue;
       
    19 import java.util.concurrent.TimeUnit;
       
    20 import java.util.concurrent.atomic.AtomicBoolean;
       
    21 
       
    22 import org.eclipse.core.runtime.IStatus;
       
    23 import org.eclipse.core.runtime.ListenerList;
       
    24 import org.eclipse.core.runtime.Status;
       
    25 import org.eclipse.cdt.dsf.concurrent.DataRequestMonitor;
       
    26 import org.eclipse.cdt.dsf.concurrent.RequestMonitor;
       
    27 import org.eclipse.cdt.examples.dsf.DsfExamplesPlugin;
       
    28 
       
    29 /**
       
    30  * Thread-based implementation of the data generator.
       
    31  * <p>
       
    32  * This generator is based around a queue of client requests and a thread which 
       
    33  * reads the requests from the queue and processes them. The distinguishing 
       
    34  * feature of this generator is that it uses a a blocking queue as the main 
       
    35  * synchronization object.  However, fListeners, fShutdown,  and fChangedIndexes 
       
    36  * fields also need to be thread-safe and so they implement their own 
       
    37  * synchronization.
       
    38  * </p>
       
    39  */
       
    40 public class DataGeneratorWithThread extends Thread implements IDataGenerator {
       
    41 
       
    42     // Request objects are used to serialize the interface calls into objects
       
    43     // which can then be pushed into a queue.
       
    44     abstract class Request {
       
    45         final RequestMonitor fRequestMonitor;
       
    46         
       
    47         Request(RequestMonitor rm) {
       
    48             fRequestMonitor = rm;
       
    49         }
       
    50     }
       
    51     
       
    52     class CountRequest extends Request {
       
    53         CountRequest(DataRequestMonitor<Integer> rm) { 
       
    54             super(rm); 
       
    55         }
       
    56     } 
       
    57 
       
    58     class ItemRequest extends Request {
       
    59         final int fIndex;
       
    60         ItemRequest(int index, DataRequestMonitor<String> rm) { 
       
    61             super(rm);
       
    62             fIndex = index; 
       
    63         }
       
    64     } 
       
    65 
       
    66     class ShutdownRequest extends Request {
       
    67         ShutdownRequest(RequestMonitor rm) { 
       
    68             super(rm);
       
    69         }
       
    70     }
       
    71 
       
    72     // Main request queue of the data generator.  The getValue(), getCount(), 
       
    73     // and shutdown() methods write into the queue, while the run() method 
       
    74     // reads from it.
       
    75     private final BlockingQueue<Request> fQueue = new LinkedBlockingQueue<Request>();
       
    76 
       
    77     // ListenerList class provides thread safety.
       
    78     private ListenerList fListeners = new ListenerList();
       
    79     
       
    80     // Current number of elements in this generator.
       
    81     private int fCount = MIN_COUNT;
       
    82     
       
    83     // Counter used to determine when to reset the element count.
       
    84     private int fCountResetTrigger = 0;
       
    85     
       
    86     // Elements which were modified since the last reset.
       
    87     private Set<Integer> fChangedIndexes = Collections.synchronizedSet(new HashSet<Integer>());
       
    88     
       
    89     // Used to determine when to make changes in data.
       
    90     private long fLastChangeTime = System.currentTimeMillis();
       
    91 
       
    92     // Flag indicating when the generator has been shut down.
       
    93     private AtomicBoolean fShutdown = new AtomicBoolean(false);
       
    94  
       
    95     public DataGeneratorWithThread() {
       
    96         // Immediately kick off the request processing thread.
       
    97         start();
       
    98     }
       
    99      
       
   100     public void shutdown(RequestMonitor rm) {
       
   101         // Mark the generator as shut down.  After the fShutdown flag is set,
       
   102         // all new requests should be shut down.
       
   103         if (!fShutdown.getAndSet(true)) {
       
   104             fQueue.add(new ShutdownRequest(rm));
       
   105         } else {
       
   106             // 
       
   107             rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
       
   108             rm.done();
       
   109         }        
       
   110     }
       
   111 
       
   112     public void getCount(DataRequestMonitor<Integer> rm) {
       
   113         if (!fShutdown.get()) {
       
   114             fQueue.add(new CountRequest(rm));
       
   115         } else {
       
   116             rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
       
   117             rm.done();
       
   118         }        
       
   119     }
       
   120     
       
   121     public void getValue(int index, DataRequestMonitor<String> rm) { 
       
   122         if (!fShutdown.get()) {
       
   123             fQueue.add(new ItemRequest(index, rm));
       
   124         } else {
       
   125             rm.setStatus(new Status(IStatus.ERROR, DsfExamplesPlugin.PLUGIN_ID, "Supplier shut down"));
       
   126             rm.done();
       
   127         }        
       
   128     } 
       
   129 
       
   130     public void addListener(Listener listener) {
       
   131         fListeners.add(listener);
       
   132     }
       
   133 
       
   134     public void removeListener(Listener listener) {
       
   135         fListeners.remove(listener);
       
   136     }
       
   137     
       
   138     @Override
       
   139     public void run() {
       
   140         try {
       
   141             while(true) {
       
   142                 // Get the next request from the queue.  The time-out 
       
   143                 // ensures that that the random changes get processed. 
       
   144                 final Request request = fQueue.poll(100, TimeUnit.MILLISECONDS);
       
   145                 
       
   146                 // If a request was dequeued, process it.
       
   147                 if (request != null) {
       
   148                     // Simulate a processing delay.
       
   149                     Thread.sleep(PROCESSING_DELAY);
       
   150                     
       
   151                     if (request instanceof CountRequest) {
       
   152                         processCountRequest((CountRequest)request);
       
   153                     } else if (request instanceof ItemRequest) {
       
   154                         processItemRequest((ItemRequest)request);
       
   155                     } else if (request instanceof ShutdownRequest) {
       
   156                         // If shutting down, just break out of the while(true) 
       
   157                         // loop and thread will exit.
       
   158                         request.fRequestMonitor.done();
       
   159                         break;
       
   160                     }
       
   161                 }
       
   162                 
       
   163                 // Simulate data changes.
       
   164                 randomChanges();
       
   165             }
       
   166         }
       
   167         catch (InterruptedException x) {}
       
   168     } 
       
   169 
       
   170     private void processCountRequest(CountRequest request) {
       
   171         @SuppressWarnings("unchecked") // Suppress warning about lost type info.
       
   172         DataRequestMonitor<Integer> rm = (DataRequestMonitor<Integer>)request.fRequestMonitor;
       
   173         
       
   174         rm.setData(fCount);
       
   175         rm.done();
       
   176     }
       
   177 
       
   178     private void processItemRequest(ItemRequest request) {
       
   179         @SuppressWarnings("unchecked") // Suppress warning about lost type info.
       
   180         DataRequestMonitor<String> rm = (DataRequestMonitor<String>)request.fRequestMonitor; 
       
   181 
       
   182         if (fChangedIndexes.contains(request.fIndex)) {
       
   183             rm.setData("Changed: " + request.fIndex);
       
   184         } else {
       
   185             rm.setData(Integer.toString(request.fIndex));
       
   186         }
       
   187         rm.done();
       
   188     } 
       
   189  
       
   190     
       
   191     private void randomChanges() {
       
   192         // Check if enough time is elapsed.
       
   193         if (System.currentTimeMillis() > fLastChangeTime + RANDOM_CHANGE_INTERVAL) {
       
   194             fLastChangeTime = System.currentTimeMillis();
       
   195             
       
   196             // Once every number of changes, reset the count, the rest of the 
       
   197             // times just change certain values.
       
   198             if (++fCountResetTrigger % RANDOM_COUNT_CHANGE_INTERVALS == 0){
       
   199                 randomCountReset();
       
   200             } else {
       
   201                 randomDataChange();
       
   202             }
       
   203         }
       
   204     }
       
   205      
       
   206     private void randomCountReset() {
       
   207         // Calculate the new count.
       
   208         Random random = new java.util.Random();
       
   209         fCount = MIN_COUNT + Math.abs(random.nextInt()) % (MAX_COUNT - MIN_COUNT);
       
   210 
       
   211         // Reset the changed values.
       
   212         fChangedIndexes.clear();
       
   213         
       
   214         // Notify listeners  
       
   215         for (Object listener : fListeners.getListeners()) {
       
   216             ((Listener)listener).countChanged();
       
   217         }
       
   218     }
       
   219      
       
   220     private void randomDataChange() {
       
   221         // Calculate the indexes to change.
       
   222         Random random = new java.util.Random();
       
   223         Set<Integer> set = new HashSet<Integer>();
       
   224         for (int i = 0; i < fCount * RANDOM_CHANGE_SET_PERCENTAGE / 100; i++) {
       
   225             set.add( new Integer(Math.abs(random.nextInt()) % fCount) );
       
   226         }                
       
   227 
       
   228         // Add the indexes to an overall set of changed indexes.
       
   229         fChangedIndexes.addAll(set);
       
   230         
       
   231         // Notify listeners  
       
   232         for (Object listener : fListeners.getListeners()) {
       
   233             ((Listener)listener).valuesChanged(set);
       
   234         }
       
   235     }    
       
   236 }
       
   237 
       
   238