00001
00004 package cast.core;
00005
00006 import java.util.Queue;
00007 import java.util.concurrent.ConcurrentLinkedQueue;
00008 import java.util.concurrent.Semaphore;
00009
00017 public abstract class QueuedDataRunnable<QueuedData> extends ControlledRunnable {
00018
00022 private Queue<QueuedData> m_queue;
00023
00024 private Semaphore m_changeSemaphore;
00025
00026 public QueuedDataRunnable() {
00027 m_changeSemaphore = new Semaphore(0, true);
00028 }
00029
00030
00031
00032
00033
00034
00035 public void run() {
00036
00037 QueuedData data;
00038
00039 while (isRunning()) {
00040
00041 while (m_queue != null && !m_queue.isEmpty()) {
00042
00043 synchronized (m_queue) {
00044 data = m_queue.poll();
00045 }
00046
00047 nextInQueue(data);
00048
00049 }
00050
00051 try {
00052 m_changeSemaphore.acquire();
00053 } catch (InterruptedException e) {
00054 e.printStackTrace();
00055 System.exit(1);
00056 }
00057
00058 }
00059
00060 }
00061
00062 protected abstract void nextInQueue(QueuedData _data);
00063
00064 public void queue(QueuedData _data) {
00065
00066 if (m_queue == null) {
00067 m_queue = new ConcurrentLinkedQueue<QueuedData>();
00068 }
00069
00070 synchronized (m_queue) {
00071 m_queue.add(_data);
00072 }
00073 m_changeSemaphore.release();
00074 }
00075
00076 }