00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00019 package cast.architecture;
00020
00021 import java.util.ArrayList;
00022 import java.util.LinkedList;
00023 import java.util.List;
00024 import java.util.Queue;
00025 import java.util.Vector;
00026 import java.util.concurrent.ConcurrentLinkedQueue;
00027 import java.util.concurrent.Semaphore;
00028
00029 import Ice.Current;
00030 import cast.CASTException;
00031 import cast.DoesNotExistOnWMException;
00032 import cast.SubarchitectureComponentException;
00033 import cast.UnknownSubarchitectureException;
00034 import cast.cdl.FilterRestriction;
00035 import cast.cdl.RECEIVERPRIORITYHIGH;
00036 import cast.cdl.RECEIVERPRIORITYLOW;
00037 import cast.cdl.RECEIVERPRIORITYMEDIUM;
00038 import cast.cdl.WorkingMemoryAddress;
00039 import cast.cdl.WorkingMemoryChange;
00040 import cast.cdl.WorkingMemoryChangeFilter;
00041 import cast.cdl.WorkingMemoryChangeQueueBehaviour;
00042 import cast.cdl.WorkingMemoryEntry;
00043 import cast.cdl.WorkingMemoryEntrySeqHolder;
00044 import cast.core.CASTData;
00045 import cast.core.CASTUtils;
00046 import cast.core.ControlledRunnable;
00047 import cast.core.SinglePlaceQueue;
00048 import cast.core.logging.ComponentLogger;
00049 import cast.interfaces.WorkingMemoryPrx;
00050 import cast.interfaces.WorkingMemoryPrxHelper;
00051 import cast.interfaces._WorkingMemoryReaderComponentOperations;
00052
00062 public abstract class WorkingMemoryReaderComponent extends
00063 WorkingMemoryWriterComponent implements
00064 _WorkingMemoryReaderComponentOperations {
00065
00066 public enum ChangeReceiverPriority {
00067 HIGH(RECEIVERPRIORITYHIGH.value), MEDIUM(RECEIVERPRIORITYMEDIUM.value), LOW(
00068 RECEIVERPRIORITYLOW.value);
00069 public int priority;
00070
00071 private ChangeReceiverPriority(int _priority) {
00072 priority = _priority;
00073 }
00074 }
00075
00082 private class WorkingMemoryChangeThread extends ControlledRunnable {
00083
00087 private Queue<WorkingMemoryChange> m_changes;
00088
00092 private WorkingMemoryReaderComponent m_gdp;
00093
00094 private Semaphore m_changeSemaphore;
00095
00099 private final Queue<WorkingMemoryChangeReceiver> m_receivers;
00100
00107 public WorkingMemoryChangeThread(WorkingMemoryReaderComponent _gdp) {
00108
00109
00110 m_gdp = _gdp;
00111
00112 if (m_gdp.m_queueBehaviour == WorkingMemoryChangeQueueBehaviour.DISCARD) {
00113 m_changes = new SinglePlaceQueue<WorkingMemoryChange>();
00114 } else if (m_gdp.m_queueBehaviour == WorkingMemoryChangeQueueBehaviour.QUEUE) {
00115 m_changes = new ConcurrentLinkedQueue<WorkingMemoryChange>();
00116 }
00117
00118 m_changeSemaphore = new Semaphore(0, true);
00119 m_receivers = new LinkedList<WorkingMemoryChangeReceiver>();
00120 }
00121
00126 private boolean forwardToSubclass(WorkingMemoryChange[] changeList)
00127 throws SubarchitectureComponentException {
00128
00129 boolean used = false;
00130 try {
00131
00132 if (m_gdp.m_changeObjects != null) {
00133
00134
00135 removeChangeFilters();
00136
00137 for (WorkingMemoryChange wmc : changeList) {
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154 m_gdp.m_changeObjects.get(wmc, m_receivers);
00155
00156 if (!m_receivers.isEmpty()) {
00157
00158 logSubscribedChange(wmc);
00159
00160 for (WorkingMemoryChangeReceiver receiver : m_receivers) {
00161 try {
00162 receiver.workingMemoryChanged(wmc);
00163 used = true;
00164 } catch (CASTException e) {
00165 m_logger.error("*********************************************************");
00166 m_logger.error("*********************************************************");
00167 m_logger.error("Exception thrown by change receiver: "
00168 + e.message);
00169 m_logger.error("Change event: "
00170 + CASTUtils.toString(wmc));
00171 logException(e);
00172 m_logger.error("*********************************************************");
00173 m_logger.error("*********************************************************");
00174
00175 } catch (Throwable e) {
00176 m_logger.error("*********************************************************");
00177 m_logger.error("*********************************************************");
00178 m_logger.error("Something unexpected thrown by change receiver, exiting");
00179 m_logger.error("Change event: "
00180 + CASTUtils.toString(wmc));
00181 logException(e);
00182 m_logger.error("*********************************************************");
00183 m_logger.error("*********************************************************");
00184 System.exit(1);
00185 }
00186 }
00187 m_receivers.clear();
00188 } else {
00189 logUnsubscribedChange(wmc);
00190 }
00191
00192
00193 removeChangeFilters();
00194
00195 }
00196
00197 }
00198 } catch (java.util.ConcurrentModificationException e) {
00199 log(e.getStackTrace());
00200 logException(e);
00201 System.exit(0);
00202 throw e;
00203 }
00204
00205
00206 Thread.yield();
00207
00208 return used;
00209 }
00210
00211 private final void removeChangeFilters()
00212 throws SubarchitectureComponentException {
00213 if (!m_gdp.m_receiversToRemove.isEmpty()) {
00214 for (WorkingMemoryChangeReceiver receiver : m_gdp.m_receiversToRemove) {
00215 debug("removing receiver: " + receiver);
00216 m_gdp.removeChangeFilterHelper(receiver);
00217 }
00218 m_gdp.m_receiversToRemove.clear();
00219 }
00220 }
00221
00228 private void runDiscard() throws SubarchitectureComponentException {
00229 WorkingMemoryChange[] changeList = new WorkingMemoryChange[1];
00230 boolean used = false;
00231 while (isRunning()) {
00232
00233 while (isRunning() && !m_changes.isEmpty()) {
00234
00235
00236
00237 m_gdp.lockComponent();
00238
00239
00240
00241 synchronized (m_changes) {
00242 if (m_changes.size() > 1) {
00243 m_gdp.debug("discarding " + (m_changes.size() - 1)
00244 + " change events ");
00245 } else {
00246
00247 }
00248
00249 changeList[0] = m_changes.poll();
00250 m_changes.clear();
00251
00252 }
00253
00254 try {
00255
00256 used = forwardToSubclass(changeList);
00257 } catch (SubarchitectureComponentException e) {
00258 logException(e);
00259 } catch (RuntimeException e) {
00260 m_logger.error("Caught RuntimeException and rethrowing: "
00261 + e.getMessage());
00262 throw e;
00263 }
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273 m_gdp.unlockComponent();
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296 }
00297
00298 if (used) {
00299 synchronized (m_gdp.m_wmChangeLock) {
00300
00301 m_gdp.m_wmChangeLock.notifyAll();
00302 }
00303 }
00304 try {
00305 m_changeSemaphore.acquire();
00306 } catch (InterruptedException e) {
00307 logException(e);
00308 System.exit(1);
00309 }
00310
00311 }
00312 }
00313
00320 private void runQueue() throws SubarchitectureComponentException {
00321
00322 WorkingMemoryChange[] changeList;
00323 boolean used = false;
00324
00325 while (isRunning()) {
00326
00327 while (!m_changes.isEmpty()) {
00328
00329
00330
00331 m_gdp.lockComponent();
00332 synchronized (m_changes) {
00333
00334 changeList = new WorkingMemoryChange[m_changes.size()];
00335
00336 for (int i = 0; i < changeList.length; i++) {
00337 changeList[i] = m_changes.poll();
00338 }
00339
00340 assert m_changes.size() == 0 : "emptied change list is not empty!";
00341
00342
00343
00344
00345
00346
00347 }
00348 try {
00349
00350 used = forwardToSubclass(changeList);
00351 } catch (SubarchitectureComponentException e) {
00352 logException(e);
00353 } catch (RuntimeException e) {
00354 m_logger.error("Caught RuntimeException and rethrowing: "
00355 + e.getMessage());
00356 throw e;
00357 }
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371 m_gdp.unlockComponent();
00372
00373 if (used) {
00374 synchronized (m_gdp.m_wmChangeLock) {
00375
00376 m_gdp.m_wmChangeLock.notifyAll();
00377 }
00378 }
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400 }
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415 try {
00416 m_changeSemaphore.acquire();
00417 } catch (InterruptedException e) {
00418 logException(e);
00419 System.exit(1);
00420 }
00421
00422 }
00423 }
00424
00431 public void queueChange(WorkingMemoryChange _change) {
00432
00433
00434
00435 synchronized (m_changes) {
00436 m_changes.add(_change);
00437 }
00438
00439
00440
00441
00442
00443
00444 m_changeSemaphore.release();
00445
00446
00447
00448
00449 }
00450
00456 public void run() {
00457 try {
00458 if (m_gdp.m_queueBehaviour == WorkingMemoryChangeQueueBehaviour.DISCARD) {
00459 m_changes = new SinglePlaceQueue<WorkingMemoryChange>();
00460 runDiscard();
00461 } else if (m_gdp.m_queueBehaviour == WorkingMemoryChangeQueueBehaviour.QUEUE) {
00462 m_changes = new ConcurrentLinkedQueue<WorkingMemoryChange>();
00463 runQueue();
00464 }
00465 } catch (SubarchitectureComponentException e) {
00466 logException(e);
00467 System.exit(1);
00468 }
00469 }
00470
00474 public void stop() {
00475
00476 super.stop();
00477
00478 synchronized (m_changes) {
00479 m_changes.clear();
00480 }
00481
00482
00483
00484
00485
00486 m_changeSemaphore.release();
00487 }
00488
00489 }
00490
00491 private Object m_wmChangeLock;
00492
00496 private WorkingMemoryChangeThread m_wmChangeRunnable;
00497
00501 private Thread m_wmChangeThread;
00502
00503 private Vector<WorkingMemoryChangeReceiver> m_receiversToRemove;
00504
00505 protected boolean m_bReceivingChanges;
00506
00511 protected boolean m_makeCopyOnRead;
00512
00513 private WorkingMemoryChangeFilterMap<WorkingMemoryChangeReceiver> m_changeObjects;
00514
00515
00516
00517
00518
00519
00520
00521
00526 protected WorkingMemoryChangeQueueBehaviour m_queueBehaviour;
00527
00531 private ComponentLogger m_loggerForGets;
00532 private ComponentLogger m_loggerForUnsubscribedChanges;
00533 private ComponentLogger m_loggerForSubscribedChanges;
00534
00538 private WorkingMemoryPrx m_workingMemoryForRead;
00539
00547 public WorkingMemoryReaderComponent() {
00548 receiveChanges();
00549
00550 m_queueBehaviour = WorkingMemoryChangeQueueBehaviour.QUEUE;
00551 m_wmChangeRunnable = new WorkingMemoryChangeThread(this);
00552 m_wmChangeLock = new Object();
00553
00554 m_changeObjects = new WorkingMemoryChangeFilterMap<WorkingMemoryChangeReceiver>();
00555 m_receiversToRemove = new Vector<WorkingMemoryChangeReceiver>();
00556
00557 m_makeCopyOnRead = false;
00558 }
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581 protected int getFilterCount() {
00582 return m_changeObjects.size();
00583 }
00584
00592 private void removeChangeFilterHelper(
00593 WorkingMemoryChangeReceiver _workingMemoryChangeReceiver)
00594 throws SubarchitectureComponentException {
00595
00596 Vector<WorkingMemoryChangeFilter> removed = new Vector<WorkingMemoryChangeFilter>();
00597 m_changeObjects.remove(_workingMemoryChangeReceiver, removed);
00598
00599 for (WorkingMemoryChangeFilter filter : removed) {
00600 m_workingMemory.removeComponentFilter(filter);
00601 }
00602
00603 }
00604
00624 public void addChangeFilter(WorkingMemoryChangeFilter _filter,
00625 WorkingMemoryChangeReceiver _receiver) {
00626 addChangeFilter(_filter, _receiver, ChangeReceiverPriority.MEDIUM);
00627 }
00628
00649 public void addChangeFilter(WorkingMemoryChangeFilter _filter,
00650 WorkingMemoryChangeReceiver _receiver,
00651 ChangeReceiverPriority _priority) {
00652 addChangeFilter(_filter, _receiver, _priority.priority);
00653 }
00654
00680 public void addChangeFilter(WorkingMemoryChangeFilter _filter,
00681 WorkingMemoryChangeReceiver _receiver, int _priority) {
00682
00683 assert (m_workingMemory != null);
00684
00685
00686
00687 if (_filter.restriction == FilterRestriction.LOCALSA
00688 && _filter.address.subarchitecture.length() == 0) {
00689 _filter.address.subarchitecture = getSubarchitectureID();
00690 }
00691
00692 else if (_filter.address.subarchitecture.equals(getSubarchitectureID())
00693 && _filter.restriction == FilterRestriction.ALLSA) {
00694 _filter.restriction = FilterRestriction.LOCALSA;
00695 }
00696
00697
00698 _filter.origin = getComponentID();
00699
00700
00701
00702 m_changeObjects.put(_filter, _receiver, _priority);
00703
00704
00705
00706 m_workingMemory.registerComponentFilter(_filter, _priority);
00707 }
00708
00709 void getBaseWorkingMemoryEntries(String _type,
00710 List<WorkingMemoryEntry> _entries, String _subarch, int _count)
00711 throws UnknownSubarchitectureException {
00712 assert (_subarch.length() != 0);
00713 assert (_type.length() != 0);
00714 assert (_entries != null);
00715
00716 WorkingMemoryEntrySeqHolder holder = new WorkingMemoryEntrySeqHolder();
00717 m_workingMemoryForRead.getWorkingMemoryEntries(_type, _subarch, _count,
00718 getComponentID(), holder);
00719
00720
00721 for (WorkingMemoryEntry entry : holder.value) {
00722 updateVersion(entry.id, entry.version);
00723 logGet(entry.id, _subarch, entry.type, entry.version);
00724 _entries.add(entry);
00725 }
00726 }
00727
00728 void getBaseWorkingMemoryEntries(String _type,
00729 List<WorkingMemoryEntry> _entries, String _subarch)
00730 throws UnknownSubarchitectureException {
00731 getBaseWorkingMemoryEntries(_type, _entries, _subarch, 0);
00732 }
00733
00734 void getBaseWorkingMemoryEntries(String _type,
00735 List<WorkingMemoryEntry> _entries, int _count) {
00736 try {
00737 getBaseWorkingMemoryEntries(_type, _entries,
00738 getSubarchitectureID(), _count);
00739 } catch (UnknownSubarchitectureException e) {
00740 throw new RuntimeException(
00741 "Shouldn't happen on own subarchitecture", e);
00742 }
00743 }
00744
00758 public <Type extends Ice.Object> CASTData<Type>[] getWorkingMemoryEntries(
00759 Class<Type> _cls, int _count)
00760 throws SubarchitectureComponentException {
00761 return getWorkingMemoryEntries(getSubarchitectureID(), _cls, _count);
00762 }
00763
00774 public <Type extends Ice.Object> CASTData<Type>[] getWorkingMemoryEntries(
00775 Class<Type> _cls) throws SubarchitectureComponentException {
00776 return getWorkingMemoryEntries(_cls, 0);
00777 }
00778
00792 public <Type extends Ice.Object> CASTData<Type>[] getWorkingMemoryEntries(
00793 String _subarch, Class<Type> _cls)
00794 throws SubarchitectureComponentException {
00795
00796 return getWorkingMemoryEntries(_subarch, _cls, 0);
00797 }
00798
00816 @SuppressWarnings("unchecked")
00817 @Deprecated
00818 public <Type extends Ice.Object> CASTData<Type>[] getWorkingMemoryEntries(
00819 String _subarch, Class<Type> _cls, int _count)
00820 throws UnknownSubarchitectureException {
00821 String type = CASTUtils.typeName(_cls);
00822 List<WorkingMemoryEntry> entries = new ArrayList<WorkingMemoryEntry>();
00823 getBaseWorkingMemoryEntries(type, entries, _subarch, _count);
00824 CASTData<Type>[] array = new CASTData[entries.size()];
00825
00826 int i = 0;
00827 for (WorkingMemoryEntry entry : entries) {
00828 array[i++] = new CASTData<Type>(entry, _cls);
00829 }
00830
00831 return array;
00832 }
00833
00834 public <Type extends Ice.Object> void getMemoryEntries(Class<Type> _cls,
00835 List<Type> _entries, String _subarch, int _count)
00836 throws UnknownSubarchitectureException {
00837 String type = CASTUtils.typeName(_cls);
00838 List<WorkingMemoryEntry> entries = new ArrayList<WorkingMemoryEntry>();
00839 getBaseWorkingMemoryEntries(type, entries, _subarch, _count);
00840 for (WorkingMemoryEntry entry : entries) {
00841 _entries.add(_cls.cast(entry.entry));
00842 }
00843 }
00844
00845 public <Type extends Ice.Object> void getMemoryEntries(Class<Type> _cls,
00846 List<Type> _entries) throws UnknownSubarchitectureException {
00847 getMemoryEntries(_cls, _entries, getSubarchitectureID(), 0);
00848 }
00849
00850 public <Type extends Ice.Object> void getMemoryEntries(Class<Type> _cls,
00851 List<Type> _entries, String _subarch)
00852 throws UnknownSubarchitectureException {
00853 getMemoryEntries(_cls, _entries, _subarch, 0);
00854 }
00855
00856 public <Type extends Ice.Object> void getMemoryEntries(Class<Type> _cls,
00857 List<Type> _entries, int _count) {
00858 try {
00859 getMemoryEntries(_cls, _entries, getSubarchitectureID(), _count);
00860 } catch (UnknownSubarchitectureException e) {
00861 throw new RuntimeException(
00862 "Shouldn't happen on own subarchitecture", e);
00863 }
00864 }
00865
00866 public <Type extends Ice.Object> void getMemoryEntriesWithData(
00867 Class<Type> _cls, List<CASTData<Type>> _entries, String _subarch,
00868 int _count) throws UnknownSubarchitectureException {
00869 String type = CASTUtils.typeName(_cls);
00870 List<WorkingMemoryEntry> entries = new ArrayList<WorkingMemoryEntry>();
00871 getBaseWorkingMemoryEntries(type, entries, _subarch, _count);
00872 for (WorkingMemoryEntry entry : entries) {
00873 _entries.add(new CASTData<Type>(entry, _cls));
00874 }
00875 }
00876
00877 public <Type extends Ice.Object> void getMemoryEntriesWithDataWithData(
00878 Class<Type> _cls, List<CASTData<Type>> _entries, String _subarch)
00879 throws UnknownSubarchitectureException {
00880 getMemoryEntriesWithData(_cls, _entries, _subarch, 0);
00881 }
00882
00883 public <Type extends Ice.Object> void getMemoryEntriesWithData(
00884 Class<Type> _cls, List<CASTData<Type>> _entries, int _count) {
00885 try {
00886 getMemoryEntriesWithData(_cls, _entries, getSubarchitectureID(),
00887 _count);
00888 } catch (UnknownSubarchitectureException e) {
00889 throw new RuntimeException(
00890 "Shouldn't happen on own subarchitecture", e);
00891 }
00892 }
00893
00894 WorkingMemoryEntry getBaseMemoryEntry(String _id)
00895 throws DoesNotExistOnWMException {
00896 try {
00897 return getBaseMemoryEntry(_id, getSubarchitectureID());
00898 } catch (UnknownSubarchitectureException e) {
00899 throw new RuntimeException(
00900 "Shouldn't happen on own subarchitecture", e);
00901 }
00902 }
00903
00904 WorkingMemoryEntry getBaseMemoryEntry(WorkingMemoryAddress _wma)
00905 throws DoesNotExistOnWMException, UnknownSubarchitectureException {
00906 return getBaseMemoryEntry(_wma.id, _wma.subarchitecture);
00907 }
00908
00909 WorkingMemoryEntry getBaseMemoryEntry(String _id, String _subarch)
00910 throws DoesNotExistOnWMException, UnknownSubarchitectureException {
00911
00912 assert (_id != null);
00913 assert (_subarch != null);
00914 assert (_id.length() != 0);
00915 assert (_subarch.length() != 0);
00916 assert (m_workingMemory != null);
00917 WorkingMemoryEntry entry = m_workingMemoryForRead
00918 .getWorkingMemoryEntry(_id, _subarch, getComponentID());
00919 assert (entry != null);
00920
00921 updateVersion(entry.id, entry.version);
00922 logGet(_id, _subarch, entry.type, entry.version);
00923 return entry;
00924 }
00925
00937 @Deprecated
00938 public CASTData<?> getWorkingMemoryEntry(String _id)
00939 throws DoesNotExistOnWMException {
00940 try {
00941 return getWorkingMemoryEntry(_id, getSubarchitectureID());
00942 } catch (UnknownSubarchitectureException e) {
00943 throw new RuntimeException(
00944 "Shouldn't happen on own subarchitecture", e);
00945 }
00946 }
00947
00960 @SuppressWarnings("unchecked")
00961 @Deprecated
00962 public CASTData<?> getWorkingMemoryEntry(String _id, String _subarch)
00963 throws DoesNotExistOnWMException, UnknownSubarchitectureException {
00964 WorkingMemoryEntry entry = getBaseMemoryEntry(_id, _subarch);
00965 return new CASTData(entry.id, entry.type, entry.version, entry.entry);
00966 }
00967
00978 @Deprecated
00979 public CASTData<?> getWorkingMemoryEntry(WorkingMemoryAddress _wma)
00980 throws SubarchitectureComponentException {
00981 return getWorkingMemoryEntry(_wma.id, _wma.subarchitecture);
00982 }
00983
00984 public <T extends Ice.Object> T getMemoryEntry(String _id, Class<T> _cls)
00985 throws DoesNotExistOnWMException {
00986 try {
00987 return getMemoryEntry(_id, getSubarchitectureID(), _cls);
00988 } catch (UnknownSubarchitectureException e) {
00989 throw new RuntimeException(
00990 "Shouldn't happen on own subarchitecture", e);
00991 }
00992 }
00993
00994 public <T extends Ice.Object> T getMemoryEntry(String _id, String _subarch,
00995 Class<T> _cls) throws DoesNotExistOnWMException,
00996 UnknownSubarchitectureException {
00997 WorkingMemoryEntry entry = getBaseMemoryEntry(_id, _subarch);
00998 return _cls.cast(entry.entry);
00999 }
01000
01001 public <T extends Ice.Object> T getMemoryEntry(WorkingMemoryAddress _wma,
01002 Class<T> _cls) throws DoesNotExistOnWMException,
01003 UnknownSubarchitectureException {
01004 return getMemoryEntry(_wma.id, _wma.subarchitecture, _cls);
01005 }
01006
01007 public <T extends Ice.Object> CASTData<T> getMemoryEntryWithData(
01008 String _id, Class<T> _cls) throws DoesNotExistOnWMException {
01009 try {
01010 return getMemoryEntryWithData(_id, getSubarchitectureID(), _cls);
01011 } catch (UnknownSubarchitectureException e) {
01012 throw new RuntimeException(
01013 "Shouldn't happen on own subarchitecture", e);
01014 }
01015 }
01016
01017 public <T extends Ice.Object> CASTData<T> getMemoryEntryWithData(
01018 String _id, String _subarch, Class<T> _cls)
01019 throws DoesNotExistOnWMException, UnknownSubarchitectureException {
01020 WorkingMemoryEntry entry = getBaseMemoryEntry(_id, _subarch);
01021 return new CASTData<T>(entry, _cls);
01022 }
01023
01024 public <T extends Ice.Object> CASTData<T> getMemoryEntryWithData(
01025 WorkingMemoryAddress _wma, Class<T> _cls)
01026 throws DoesNotExistOnWMException, UnknownSubarchitectureException {
01027 return getMemoryEntryWithData(_wma.id, _wma.subarchitecture, _cls);
01028 }
01029
01033 protected void receiveChanges() {
01034 m_bReceivingChanges = true;
01035 }
01036
01040 protected void receiveNoChanges() {
01041 m_bReceivingChanges = false;
01042 }
01043
01044 public void removeChangeFilter(
01045 WorkingMemoryChangeReceiver _workingMemoryChangeReceiver)
01046 throws SubarchitectureComponentException {
01047 m_receiversToRemove.add(_workingMemoryChangeReceiver);
01048 }
01049
01054 public void waitForChanges() {
01055 synchronized (m_wmChangeLock) {
01056 try {
01057 m_wmChangeLock.wait();
01058 } catch (InterruptedException e) {
01059 logException(e);
01060 System.exit(1);
01061 }
01062 }
01063 }
01064
01072 public void receiveChangeEvent(WorkingMemoryChange _wmc, Current __current) {
01073
01074
01075 if (isRunning() && m_bReceivingChanges) {
01076
01077
01078 if (m_changeObjects != null) {
01079 m_wmChangeRunnable.queueChange(_wmc);
01080 }
01081 }
01082 }
01083
01090 @Override
01091 public void startInternal() {
01092 super.startInternal();
01093
01094 m_wmChangeRunnable.start();
01095 m_wmChangeThread = new Thread(m_wmChangeRunnable, getComponentID()
01096 + "-receiver");
01097
01098 m_wmChangeThread.setPriority(Thread.NORM_PRIORITY + 1);
01099 m_wmChangeThread.start();
01100
01101 m_loggerForGets = getLogger(".wm.rw.get");
01102 m_loggerForSubscribedChanges = getLogger(".wm.ch.sub");
01103 m_loggerForUnsubscribedChanges = getLogger(".wm.ch.all");
01104 }
01105
01115 private final void logGet(String _id, String _subarch, String _type,
01116 int _version) {
01117
01118 if (m_loggerForGets.isTraceEnabled()) {
01119 m_loggerForGets.trace(CASTUtils.concatenate("get,",
01120 CASTUtils.toString(getCASTTime()), ",", _id, ",", _subarch,
01121 ",", _type, ",", _version), getLogAdditions());
01122 }
01123 }
01124
01130 protected void logChange(WorkingMemoryChange _wmc, ComponentLogger _logger) {
01131 _logger.trace(CASTUtils.concatenate("wmc,",
01132 CASTUtils.toString(getCASTTime()), ",", _wmc.operation, ",",
01133 _wmc.address.id, ",", _wmc.address.subarchitecture, ",",
01134 _wmc.type), getLogAdditions());
01135 }
01136
01142 private void logUnsubscribedChange(WorkingMemoryChange _wmc) {
01143 if (m_loggerForUnsubscribedChanges.isTraceEnabled()) {
01144 logChange(_wmc, m_loggerForUnsubscribedChanges);
01145 }
01146 }
01147
01153 private void logSubscribedChange(WorkingMemoryChange _wmc) {
01154 if (m_loggerForSubscribedChanges.isTraceEnabled()) {
01155 logChange(_wmc, m_loggerForSubscribedChanges);
01156 }
01157 }
01158
01165 @Override
01166 public void stopInternal() {
01167 super.stopInternal();
01168
01169
01170 m_wmChangeRunnable.stop();
01171 try {
01172 m_wmChangeThread.join();
01173 } catch (InterruptedException e) {
01174 logException(e);
01175 }
01176
01177
01178 synchronized (m_wmChangeLock) {
01179 m_wmChangeLock.notifyAll();
01180 }
01181 }
01182
01186 @Override
01187 public void setWorkingMemory(WorkingMemoryPrx _wm, Current __current) {
01188 super.setWorkingMemory(_wm, __current);
01189
01190
01191 if (m_workingMemory.ice_isCollocationOptimized()) {
01192
01193
01194 m_workingMemoryForRead = WorkingMemoryPrxHelper
01195 .uncheckedCast(m_workingMemory
01196 .ice_collocationOptimized(false));
01197 } else {
01198 m_workingMemoryForRead = m_workingMemory;
01199 }
01200 }
01201
01210 protected boolean resetReadCollocationOptimisation() {
01211 m_workingMemoryForRead = m_workingMemory;
01212 return m_workingMemoryForRead.ice_isCollocationOptimized();
01213 }
01214
01223 protected void turnOffReadCollocationOptimisation() {
01224
01225 if (m_workingMemoryForRead.ice_isCollocationOptimized()) {
01226 m_workingMemoryForRead = WorkingMemoryPrxHelper
01227 .uncheckedCast(m_workingMemory
01228 .ice_collocationOptimized(false));
01229 }
01230 }
01231
01232 }