Merge "Delete SpecPropertyConfiguration class"
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
index f1839b1..58ed8b9 100644
--- a/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
+++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/AuditThread.java
@@ -25,10 +25,7 @@
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import org.onap.policy.common.ia.jpa.IntegrityAuditEntity;
 import org.onap.policy.common.logging.eelf.MessageCodes;
 import org.onap.policy.common.logging.flexlogger.FlexLogger;
@@ -108,18 +105,6 @@
     private IntegrityAudit integrityAudit;
 
     /**
-     * A latch is taken from this queue before starting an audit. May be {@code null}. Used by JUnit
-     * tests.
-     */
-    private BlockingQueue<CountDownLatch> auditLatchQueue;
-
-    /**
-     * Latch to be decremented when the next audit completes. May be {@code null}. Used by JUnit
-     * tests to wait for an audit to complete.
-     */
-    private CountDownLatch auditCompletionLatch = null;
-
-    /**
      * AuditThread constructor.
      * 
      * @param resourceName the resource name
@@ -133,7 +118,7 @@
             int integrityAuditPeriodSeconds, IntegrityAudit integrityAudit) throws IntegrityAuditException {
 
         this(resourceName, persistenceUnit, properties, TimeUnit.SECONDS.toMillis(integrityAuditPeriodSeconds),
-                integrityAudit, null);
+                integrityAudit);
     }
 
     /**
@@ -148,13 +133,12 @@
      * @throws IntegrityAuditException if an error occurs
      */
     public AuditThread(String resourceName, String persistenceUnit, Properties properties, long integrityAuditMillis,
-            IntegrityAudit integrityAudit, BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
+            IntegrityAudit integrityAudit) throws IntegrityAuditException {
         this.resourceName = resourceName;
         this.persistenceUnit = persistenceUnit;
         this.properties = properties;
         this.integrityAuditPeriodMillis = integrityAuditMillis;
         this.integrityAudit = integrityAudit;
-        this.auditLatchQueue = queue;
 
         /*
          * The DbDAO Constructor registers this node in the IntegrityAuditEntity table. Each
@@ -174,14 +158,8 @@
         logger.info("AuditThread.run: Entering");
 
         try {
-            /*
-             * For JUnit testing: wait for the first latch, decrement it to indicate that the thread
-             * has started, and then wait for the next latch, before we actually start doing
-             * anything. These simply return if there is no latch queue defined.
-             */
-            getNextLatch();
-            decrementLatch();
-            getNextLatch();
+            // for junit testing
+            runStarted();
 
             /*
              * Triggers change in designation, unless no other viable candidate.
@@ -284,11 +262,8 @@
                      * property, otherwise just sleep the normal interval.
                      */
                     if (auditCompleted) {
-                        // indicate that an audit has completed
-                        decrementLatch();
-
-                        // don't start the next audit cycle until a latch has been provided
-                        getNextLatch();
+                        // for junit testing: indicate that an audit has completed
+                        auditCompleted();
 
                         if (logger.isDebugEnabled()) {
                             logger.debug("AuditThread.run: Audit completed; resourceName=" + this.resourceName
@@ -342,29 +317,6 @@
     }
 
     /**
-     * Gets the next audit-completion latch from the queue. Blocks, if the queue is empty.
-     * 
-     * @throws InterruptedException if interrupted while waiting
-     */
-    private void getNextLatch() throws InterruptedException {
-        BlockingQueue<CountDownLatch> queue = this.auditLatchQueue;
-        if (queue != null) {
-            this.auditCompletionLatch = queue.take();
-        }
-    }
-
-    /**
-     * Decrements the current audit-completion latch, if any.
-     */
-    private void decrementLatch() {
-        CountDownLatch latch = this.auditCompletionLatch;
-        if (latch != null) {
-            this.auditCompletionLatch = null;
-            latch.countDown();
-        }
-    }
-
-    /**
      * Determines if an exception is an InterruptedException or was caused by an
      * InterruptedException.
      * 
@@ -788,6 +740,26 @@
     }
 
     /**
+     * Indicates that the {@link #run()} method has started. This method simply returns,
+     * and may overridden by junit tests.
+     * 
+     * @throws InterruptedException
+     */
+    public void runStarted() throws InterruptedException {
+        // does nothing
+    }
+
+    /**
+     * Indicates that an audit has completed. This method simply returns, and may
+     * overridden by junit tests.
+     * 
+     * @throws InterruptedException
+     */
+    public void auditCompleted() throws InterruptedException {
+        // does nothing
+    }
+
+    /**
      * Adjusts the thread-sleep-interval to be used when an audit has <i>not</i> been completed.
      * Used by JUnit tests.
      * 
diff --git a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
index 9abdbe5..b3330fa 100644
--- a/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
+++ b/integrity-audit/src/main/java/org/onap/policy/common/ia/IntegrityAudit.java
@@ -21,9 +21,6 @@
 package org.onap.policy.common.ia;
 
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-
 import org.onap.policy.common.ia.IntegrityAuditProperties.NodeTypeEnum;
 import org.onap.policy.common.logging.flexlogger.FlexLogger;
 import org.onap.policy.common.logging.flexlogger.Logger;
@@ -217,37 +214,20 @@
      * @throws IntegrityAuditException if an error occurs
      */
     public void startAuditThread() throws IntegrityAuditException {
-        startAuditThread(null);
-    }
-
-    /**
-     * Starts the audit thread.
-     * 
-     * @param queue the queue
-     * @return {@code true} if the thread was started, {@code false} otherwise
-     * @throws IntegrityAuditException if an error occurs
-     */
-    protected boolean startAuditThread(BlockingQueue<CountDownLatch> queue) throws IntegrityAuditException {
-
         logger.info("startAuditThread: Entering");
 
-        boolean success = false;
-
         if (integrityAuditPeriodMillis >= 0) {
-            this.auditThread = new AuditThread(this.resourceName, this.persistenceUnit, this.properties,
-                    integrityAuditPeriodMillis, this, queue);
+            this.auditThread = makeAuditThread(this.resourceName, this.persistenceUnit, this.properties, integrityAuditPeriodMillis);
             logger.info("startAuditThread: Audit started and will run every " + integrityAuditPeriodMillis / 1000
                     + " seconds");
             this.auditThread.start();
-            success = true;
+            
         } else {
             logger.info("startAuditThread: Suppressing integrity audit, integrityAuditPeriodSeconds="
                     + integrityAuditPeriodMillis / 1000);
         }
 
         logger.info("startAuditThread: Exiting");
-
-        return success;
     }
 
     /**
@@ -299,4 +279,29 @@
             return !this.auditThread.isAlive();
         }
     }
+
+    /**
+     * 
+     * @return {@code true} if an audit thread exists, {@code false} otherwise
+     */
+    protected boolean haveAuditThread() {
+        return (this.auditThread != null);
+    }
+
+    /**
+     * Creates an audit thread. May be overridden by junit tests.
+     * 
+     * @param resourceName2
+     * @param persistenceUnit2
+     * @param properties2
+     * @param integrityAuditPeriodMillis2
+     * 
+     * @return a new audit thread
+     * @throws IntegrityAuditException
+     */
+    protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+                    long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+        
+        return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this);
+    }
 }
diff --git a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
index afbcc45..c917990 100644
--- a/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
+++ b/integrity-audit/src/test/java/org/onap/policy/common/ia/IntegrityAuditTestBase.java
@@ -22,10 +22,6 @@
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -34,21 +30,19 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.EntityTransaction;
 import javax.persistence.Persistence;
-
 import org.onap.policy.common.utils.jpa.EntityMgrCloser;
 import org.onap.policy.common.utils.jpa.EntityMgrFactoryCloser;
 import org.onap.policy.common.utils.jpa.EntityTransCloser;
 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 import org.slf4j.LoggerFactory;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
 
 /**
  * All JUnits are designed to run in the local development environment where they have write
@@ -464,26 +458,26 @@
     protected void runAudit(MyIntegrityAudit... auditors) throws InterruptedException {
 
         // start an audit cycle on each auditor
-        List<CountDownLatch> latches = new ArrayList<>(auditors.length);
+        List<Semaphore> semaphores = new ArrayList<>(auditors.length);
         for (MyIntegrityAudit p : auditors) {
-            latches.add(p.startAudit());
+            semaphores.add(p.startAudit());
         }
 
         // wait for each auditor to complete its cycle
-        for (CountDownLatch latch : latches) {
-            waitLatch(latch);
+        for (Semaphore sem : semaphores) {
+            waitSem(sem);
         }
     }
 
     /**
-     * Waits for a latch to reach zero.
+     * Waits for a semaphore to be released.
      * 
-     * @param latch the latch to wait for
+     * @param sem the semaphore for which to wait
      * @throws InterruptedException if the thread is interrupted
      * @throws AssertionError if the latch did not reach zero in the allotted time
      */
-    protected void waitLatch(CountDownLatch latch) throws InterruptedException {
-        assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+    protected void waitSem(Semaphore sem) throws InterruptedException {
+        assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.SECONDS));
     }
 
     /**
@@ -520,11 +514,16 @@
      * Manages audits by inserting latches into a queue for the AuditThread to count.
      */
     protected class MyIntegrityAudit extends IntegrityAudit {
-
+        
         /**
-         * Queue from which the AuditThread will take latches.
+         * Semaphore on which the audit thread should wait.
          */
-        private BlockingQueue<CountDownLatch> queue = null;
+        private Semaphore auditSem = null;
+        
+        /**
+         * Semaphore on which the junit management thread should wait.
+         */
+        private Semaphore junitSem = null;
 
         /**
          * Constructs an auditor and starts the AuditThread.
@@ -550,16 +549,14 @@
         }
 
         /**
-         * Triggers an audit by adding a latch to the queue.
+         * Triggers an audit by releasing the audit thread's semaphore.
          * 
-         * @return the latch that was added
+         * @return the semaphore on which to wait
          * @throws InterruptedException if the thread is interrupted
          */
-        public CountDownLatch startAudit() throws InterruptedException {
-            CountDownLatch latch = new CountDownLatch(1);
-            queue.add(latch);
-
-            return latch;
+        public Semaphore startAudit() throws InterruptedException {
+            auditSem.release();
+            return junitSem;
         }
 
         /**
@@ -567,25 +564,23 @@
          */
         @Override
         public final void startAuditThread() throws IntegrityAuditException {
-            if (queue != null) {
-                // queue up a bogus latch, in case a thread is still running
-                queue.add(new CountDownLatch(1) {
-                    @Override
-                    public void countDown() {
-                        throw new RuntimeException("auditor has multiple threads");
-                    }
-                });
+            if (auditSem != null) {
+                // release a bunch of semaphores, in case a thread is still running
+                auditSem.release(1000);
             }
+            
+            auditSem = new Semaphore(0);
+            junitSem = new Semaphore(0);
+            
+            super.startAuditThread();
 
-            queue = new LinkedBlockingQueue<>();
+            if (haveAuditThread()) {
+                // tell the thread it can run
+                auditSem.release();
 
-            if (super.startAuditThread(queue)) {
                 // wait for the thread to start
-                CountDownLatch latch = new CountDownLatch(1);
-                queue.add(latch);
-
                 try {
-                    waitLatch(latch);
+                    waitSem(junitSem);
 
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
@@ -605,5 +600,31 @@
 
             assertTrue(waitThread(this));
         }
+
+        @Override
+        protected AuditThread makeAuditThread(String resourceName2, String persistenceUnit2, Properties properties2,
+                        long integrityAuditPeriodMillis2) throws IntegrityAuditException {
+
+            return new AuditThread(resourceName2, persistenceUnit2, properties2, integrityAuditPeriodMillis2, this) {
+
+                private Semaphore auditSem = MyIntegrityAudit.this.auditSem;
+                private Semaphore junitSem = MyIntegrityAudit.this.junitSem;
+
+                @Override
+                public void runStarted() throws InterruptedException {
+                    auditSem.acquire();
+                    
+                    junitSem.release();
+                    auditSem.acquire();
+                }
+
+                @Override
+                public void auditCompleted() throws InterruptedException {
+                    junitSem.release();
+                    auditSem.acquire();
+                }
+                
+            };
+        }
     }
 }
diff --git a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
index c32a221..38dc20d 100644
--- a/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
+++ b/integrity-monitor/src/main/java/org/onap/policy/common/im/IntegrityMonitor.java
@@ -29,10 +29,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import javax.management.JMX;
 import javax.management.MBeanServerConnection;
 import javax.persistence.EntityManager;
@@ -43,7 +40,6 @@
 import javax.persistence.Persistence;
 import javax.persistence.Query;
 import javax.validation.constraints.NotNull;
-
 import org.onap.policy.common.im.jmx.ComponentAdmin;
 import org.onap.policy.common.im.jmx.ComponentAdminMBean;
 import org.onap.policy.common.im.jmx.JmxAgentConnection;
@@ -196,7 +192,7 @@
      */
     protected IntegrityMonitor(String resourceName, Properties properties) throws IntegrityMonitorException {
 
-        this(resourceName, properties, null);
+        this(resourceName, properties, new Factory());
     }
 
     /**
@@ -207,10 +203,10 @@
      * 
      * @param resourceName The resource name of the resource
      * @param properties a set of properties passed in from the resource
-     * @param queue queue to use to control the FPManager thread, or {@code null}
+     * @param factory Factory to use to control the FPManager thread
      * @throws IntegrityMonitorException if any errors are encountered in the constructor
      */
-    protected IntegrityMonitor(String resourceName, Properties properties, BlockingQueue<CountDownLatch> queue)
+    protected IntegrityMonitor(String resourceName, Properties properties, Factory factory)
             throws IntegrityMonitorException {
 
         // singleton check since this constructor can be called from a child or
@@ -357,7 +353,8 @@
             logger.error("ComponentAdmin constructor exception: {}", e.toString(), e);
         }
 
-        fpManager = new FpManager(queue);
+        fpManager = new FpManager(factory);
+        fpManager.start();
 
     }
 
@@ -373,7 +370,7 @@
      */
     public static IntegrityMonitor getInstance(String resourceName, Properties properties)
             throws IntegrityMonitorException {
-        return getInstance(resourceName, properties, null);
+        return getInstance(resourceName, properties, new Factory());
     }
 
     /**
@@ -382,13 +379,13 @@
      * 
      * @param resourceName The resource name of the resource
      * @param properties a set of properties passed in from the resource
-     * @param queue queue to use to control the FPManager thread, or {@code null}
+     * @param factory Factory to use to control the FPManager thread
      * @return The new instance of IntegrityMonitor
      * @throws IntegrityMonitorException if unable to create jmx url or the constructor returns an
      *         exception
      */
     protected static IntegrityMonitor getInstance(String resourceName, Properties properties,
-            BlockingQueue<CountDownLatch> queue) throws IntegrityMonitorException {
+                    Factory factory) throws IntegrityMonitorException {
 
         synchronized (getInstanceLock) {
             logger.debug("getInstance() called - resourceName= {}", resourceName);
@@ -399,7 +396,7 @@
 
             if (instance == null) {
                 logger.debug("Creating new instance of IntegrityMonitor");
-                instance = new IntegrityMonitor(resourceName, properties, queue);
+                instance = new IntegrityMonitor(resourceName, properties, factory);
             }
             return instance;
         }
@@ -1740,18 +1737,15 @@
      * dependencies, does a refresh state audit and runs the stateAudit.
      */
     class FpManager extends Thread {
-        private final CountDownLatch stopper = new CountDownLatch(1);
+        private boolean stopRequested = false;
 
-        private BlockingQueue<CountDownLatch> queue;
-        private CountDownLatch progressLatch = null;
+        private final Factory factory;
 
         // Constructor - start FP manager thread
-        FpManager(BlockingQueue<CountDownLatch> queue) {
-            this.queue = queue;
+        FpManager(Factory factory) {
+            this.factory = factory;
             // set now as the last time the refreshStateAudit ran
             IntegrityMonitor.this.refreshStateAuditLastRunDate = new Date();
-            // start thread
-            this.start();
         }
 
         @Override
@@ -1759,13 +1753,13 @@
             logger.debug("FPManager thread running");
 
             try {
-                getLatch();
-                decrementLatch();
+                factory.runStarted();
 
-                while (!stopper.await(cycleIntervalMillis, TimeUnit.MILLISECONDS)) {
-                    getLatch();
+                while(!stopRequested) {
+                    factory.doSleep(cycleIntervalMillis);
+                    
                     IntegrityMonitor.this.runOnce();
-                    decrementLatch();
+                    factory.monitorCompleted();
                 }
 
             } catch (InterruptedException e) {
@@ -1775,31 +1769,9 @@
         }
 
         public void stopAndExit() {
-            stopper.countDown();
+            stopRequested = true;
             this.interrupt();
         }
-
-        /**
-         * Gets the next latch from the queue.
-         * 
-         * @throws InterruptedException
-         * 
-         */
-        private void getLatch() throws InterruptedException {
-            if (queue != null) {
-                progressLatch = queue.take();
-            }
-        }
-
-        /**
-         * Decrements the current latch.
-         */
-        private void decrementLatch() {
-            if (progressLatch != null) {
-                progressLatch.countDown();
-            }
-        }
-
     }
 
     private void runOnce() {
@@ -1934,6 +1906,40 @@
         return allNotWellMap;
     }
 
+    /**
+     * Used to access various objects. Overridden by junit tests.
+     */
+    public static class Factory {
+
+        /**
+         * Indicates that the {@link FpManager#run()} method has started. This method
+         * simply returns.
+         * 
+         * @throws InterruptedException
+         */
+        public void runStarted() throws InterruptedException {
+            // does nothing
+        }
+
+        /**
+         * Sleeps for a period of time.
+         * @param sleepMs   amount of time to sleep, in milliseconds
+         * @throws InterruptedException 
+         */
+        public void doSleep(long sleepMs) throws InterruptedException {
+            Thread.sleep(sleepMs);
+        }
+
+        /**
+         * Indicates that a monitor activity has completed. This method simply returns.
+         * 
+         * @throws InterruptedException
+         */
+        public void monitorCompleted() throws InterruptedException {
+            // does nothing
+        }
+    }
+
     /*
      * The remaining methods are used by JUnit tests.
      */
diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
index 7f1e551..091dcc9 100644
--- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
+++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTest.java
@@ -22,23 +22,19 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import java.util.concurrent.Semaphore;
 import javax.persistence.EntityTransaction;
 import javax.persistence.Query;
 import javax.persistence.TemporalType;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.onap.policy.common.im.IntegrityMonitor.Factory;
 import org.onap.policy.common.im.jpa.ForwardProgressEntity;
 import org.onap.policy.common.im.jpa.ResourceRegistrationEntity;
 import org.onap.policy.common.im.jpa.StateManagementEntity;
@@ -57,7 +53,8 @@
     private static EntityTransaction et;
     private static String resourceName;
 
-    private BlockingQueue<CountDownLatch> queue;
+    private Semaphore monitorSem;
+    private Semaphore junitSem;
 
     /**
      * Set up for test class.
@@ -900,9 +897,36 @@
     private IntegrityMonitor makeMonitor(String resourceName, Properties myProp) throws Exception {
         IntegrityMonitor.deleteInstance();
 
-        queue = new LinkedBlockingQueue<>();
+        monitorSem = new Semaphore(0);
+        junitSem = new Semaphore(0);
+        
+        Factory factory = new IntegrityMonitor.Factory() {
 
-        IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, queue);
+            @Override
+            public void doSleep(long sleepMs) throws InterruptedException {
+                /*
+                 * No need to sleep, as the thread won't progress until the
+                 * semaphore is released.
+                 */
+            }
+
+            @Override
+            public void runStarted() throws InterruptedException {
+                monitorSem.acquire();
+                
+                junitSem.release();
+                monitorSem.acquire();
+            }
+
+            @Override
+            public void monitorCompleted() throws InterruptedException {
+                junitSem.release();
+                monitorSem.acquire();
+            }
+            
+        };
+
+        IntegrityMonitor im = IntegrityMonitor.getInstance(resourceName, myProp, factory);
 
         // wait for the monitor thread to start
         waitStep();
@@ -916,8 +940,7 @@
      * @throws InterruptedException if the thread is interrupted
      */
     private void waitStep() throws InterruptedException {
-        CountDownLatch latch = new CountDownLatch(1);
-        queue.offer(latch);
-        waitLatch(latch);
+        monitorSem.release();
+        waitSem(junitSem);
     }
 }
diff --git a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
index 0c8259b..e556230 100644
--- a/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
+++ b/integrity-monitor/src/test/java/org/onap/policy/common/im/IntegrityMonitorTestBase.java
@@ -22,17 +22,14 @@
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Persistence;
-
 import org.onap.policy.common.utils.jpa.EntityTransCloser;
 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 import org.slf4j.Logger;
@@ -243,14 +240,14 @@
     }
 
     /**
-     * Waits for a latch to reach zero.
+     * Waits for a semaphore to be acquired
      * 
-     * @param latch the latch
+     * @param sem the latch
      * @throws InterruptedException if the thread is interrupted
      * @throws AssertionError if the latch did not reach zero in the allotted time
      */
-    protected void waitLatch(CountDownLatch latch) throws InterruptedException {
-        assertTrue(latch.await(WAIT_MS, TimeUnit.SECONDS));
+    protected void waitSem(Semaphore sem) throws InterruptedException {
+        assertTrue(sem.tryAcquire(WAIT_MS, TimeUnit.MILLISECONDS));
     }
 
     /**
diff --git a/utils-test/pom.xml b/utils-test/pom.xml
index a7c2eae..933104b 100644
--- a/utils-test/pom.xml
+++ b/utils-test/pom.xml
@@ -42,6 +42,11 @@
 			<groupId>ch.qos.logback</groupId>
 			<artifactId>logback-classic</artifactId>
 		</dependency>
+		<dependency>
+			<groupId>org.onap.policy.common</groupId>
+			<artifactId>utils</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
new file mode 100644
index 0000000..3dfed4b
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTime.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * "Current" time, when running junit tests. This is intended to be injected into classes
+ * under test, to replace their {@link CurrentTime} objects. When {@link #sleep(long)} is
+ * invoked, it simply advances the notion of "current" time and returns immediately.
+ */
+public class TestTime extends CurrentTime {
+
+    /**
+     * "Current" time, in milliseconds, used by tests.
+     */
+    private AtomicLong tcur = new AtomicLong(System.currentTimeMillis());
+
+    /**
+     * 
+     */
+    public TestTime() {
+        super();
+    }
+
+    @Override
+    public long getMillis() {
+        return tcur.get();
+    }
+
+    @Override
+    public Date getDate() {
+        return new Date(tcur.get());
+    }
+
+    @Override
+    public void sleep(long sleepMs) throws InterruptedException {
+        tcur.addAndGet(sleepMs);
+    }
+}
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
new file mode 100644
index 0000000..7a8277c
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/TestTimeMulti.java
@@ -0,0 +1,200 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+import java.util.PriorityQueue;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * "Current" time, when running junit tests in multiple threads. This is intended to be
+ * injected into classes under test, to replace their {@link CurrentTime} objects. The
+ * {@link #sleep(long)} method blocks until all threads enter and then it moves the notion
+ * of "current" time forward, allowing threads to resume, as the end of their sleep time
+ * is reached. Additional threads do not resume until all threads have once again entered
+ * {@link #sleep(long)} or when {@link #threadCompleted()} is invoked to indicate that a
+ * thread will not re-enter {@link #sleep(long)}.
+ */
+public class TestTimeMulti extends CurrentTime {
+
+    /**
+     * Number of threads that will be sleeping simultaneously.
+     */
+    private int nthreads;
+
+    /**
+     * "Current" time, in milliseconds, used by tests.
+     */
+    private long tcur = System.currentTimeMillis();
+
+    /**
+     * Queue of sleeping threads waiting to be awakened.
+     */
+    private final PriorityQueue<Info> queue = new PriorityQueue<>();
+
+    /**
+     * Used to synchronize updates.
+     */
+    private final Object locker = new Object();
+
+    /**
+     * 
+     * @param nthreads number of threads that will be sleeping simultaneously
+     */
+    public TestTimeMulti(int nthreads) {
+        this.nthreads = nthreads;
+    }
+
+    @Override
+    public long getMillis() {
+        return tcur;
+    }
+
+    @Override
+    public Date getDate() {
+        return new Date(tcur);
+    }
+
+    @Override
+    public void sleep(long sleepMs) throws InterruptedException {
+        if (sleepMs <= 0) {
+            return;
+        }
+
+        Info info = new Info(tcur + sleepMs);
+
+        synchronized (locker) {
+            queue.add(info);
+
+            if (queue.size() == nthreads) {
+                // all threads are now sleeping - wake one up
+                wakeThreads();
+            }
+        }
+
+        // this MUST happen outside of the "synchronized" block
+        info.await();
+    }
+
+    /**
+     * Indicates that a thread has terminated or that it will no longer be invoking
+     * {@link #sleep(long)}. Awakens the next sleeping thread, if the queue is full after
+     * removing the terminated thread.
+     * 
+     * @throws IllegalStateException if the queue is already full
+     */
+    public void threadCompleted() {
+        synchronized (locker) {
+            int sz = queue.size();
+            if (sz >= nthreads) {
+                throw new IllegalStateException("too many threads still sleeping");
+            }
+
+            --nthreads;
+
+            if (sz == nthreads) {
+                // after removing terminated thread - queue is now full; awaken something
+                wakeThreads();
+            }
+        }
+    }
+
+    /**
+     * Advances the "current" time and awakens any threads sleeping until that time.
+     */
+    private void wakeThreads() {
+        Info info = queue.poll();
+        if(info == null) {
+            return;
+        }
+
+        tcur = info.getAwakenAtMs();
+        info.wake();
+
+        while ((info = queue.poll()) != null) {
+            if (tcur == info.getAwakenAtMs()) {
+                info.wake();
+
+            } else {
+                // not ready to wake this thread - put it back in the queue
+                queue.add(info);
+                break;
+            }
+        }
+    }
+
+    /**
+     * Info about a sleeping thread.
+     */
+    private static class Info implements Comparable<Info> {
+
+        /**
+         * Time, in milliseconds, at which the associated thread should awaken.
+         */
+        private final long awakenAtMs;
+
+        /**
+         * This is triggered when the associated thread should awaken.
+         */
+        private final CountDownLatch latch = new CountDownLatch(1);
+
+        /**
+         * @param awakenAtMs time, in milliseconds, at which the associated thread should
+         *        awaken
+         */
+        public Info(long awakenAtMs) {
+            this.awakenAtMs = awakenAtMs;
+        }
+
+        public long getAwakenAtMs() {
+            return awakenAtMs;
+        }
+
+        /**
+         * Awakens the associated thread by decrementing its latch.
+         */
+        public void wake() {
+            latch.countDown();
+        }
+
+        /**
+         * Blocks the current thread until awakened (i.e., until its latch is
+         * decremented).
+         * 
+         * @throws InterruptedException
+         */
+        public void await() throws InterruptedException {
+            latch.await();
+        }
+
+        @Override
+        public int compareTo(Info o) {
+            int diff = Long.compare(awakenAtMs, o.awakenAtMs);
+
+            // this assumes that Object.toString() is unique for each Info object
+            if (diff == 0)
+                diff = this.toString().compareTo(o.toString());
+
+            return diff;
+        }
+
+    }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
new file mode 100644
index 0000000..206ab5f
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeMultiTest.java
@@ -0,0 +1,116 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class TestTimeMultiTest {
+
+    private static final int NTHREADS = 10;
+    private static final int NTIMES = 100;
+    private static final long WAIT_SEC = 5L;
+    private static final long MIN_SLEEP_MS = 5L;
+
+    private TestTimeMulti ttm;
+    private Semaphore done;
+
+    @Test
+    public void test() throws Exception {
+        ttm = new TestTimeMulti(NTHREADS);
+        done = new Semaphore(0);
+
+        long tbeg = ttm.getMillis();
+
+        // create threads
+        List<MyThread> threads = new ArrayList<>(NTHREADS);
+        for (int x = 0; x < NTHREADS; ++x) {
+            threads.add(new MyThread(x + MIN_SLEEP_MS));
+        }
+
+        // launch threads
+        for (MyThread thr : threads) {
+            thr.start();
+        }
+
+        // wait for each one to complete
+        for (MyThread thr : threads) {
+            assertTrue("complete " + thr.getSleepMs(), done.tryAcquire(WAIT_SEC, TimeUnit.SECONDS));
+            ttm.threadCompleted();
+        }
+
+        // check results
+        for (MyThread thr : threads) {
+            assertEquals("time " + thr.getSleepMs(), thr.texpected, thr.tactual);
+        }
+
+        assertTrue(ttm.getMillis() >= tbeg + NTIMES * MIN_SLEEP_MS);
+    }
+
+    private class MyThread extends Thread {
+
+        private final long sleepMs;
+
+        private volatile long texpected;
+        private volatile long tactual;
+
+        public MyThread(long sleepMs) {
+            this.sleepMs = sleepMs;
+
+            this.setDaemon(true);
+        }
+
+        public long getSleepMs() {
+            return sleepMs;
+        }
+
+        @Override
+        public void run() {
+            try {
+                for (int x = 0; x < NTIMES; ++x) {
+                    texpected = ttm.getMillis() + sleepMs;
+                    ttm.sleep(sleepMs);
+
+                    if ((tactual = ttm.getMillis()) != texpected) {
+                        break;
+                    }
+
+                    if ((tactual = ttm.getDate().getTime()) != texpected) {
+                        break;
+                    }
+                }
+
+            } catch (InterruptedException expected) {
+                Thread.currentThread().interrupt();
+            }
+
+            done.release();
+        }
+    }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
new file mode 100644
index 0000000..c1e15b3
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/TestTimeTest.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils-Test
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class TestTimeTest {
+
+    @Test
+    public void test() throws Exception {
+        TestTime tm = new TestTime();
+        TestTime tm2 = new TestTime();
+
+        long treal = System.currentTimeMillis();
+
+        long tcur = tm.getMillis();
+        assertEquals(tcur, tm.getDate().getTime());
+
+        long tsleep = 10000L;
+        long tcur2 = tcur;
+
+        // sleep a bit and then check values
+        tcur2 += tsleep;
+        tm2.sleep(tsleep);
+        assertEquals(tcur2, tm2.getMillis());
+        assertEquals(tcur2, tm2.getDate().getTime());
+
+        // sleep some more and then check values
+        tcur2 += tsleep;
+        tm2.sleep(tsleep);
+        assertEquals(tcur2, tm2.getMillis());
+        assertEquals(tcur2, tm2.getDate().getTime());
+
+        // check again - to ensure unchanged
+        assertEquals(tcur2, tm2.getMillis());
+        assertEquals(tcur2, tm2.getDate().getTime());
+
+        // original should also be unchanged
+        assertEquals(tcur, tm.getMillis());
+        assertEquals(tcur, tm.getDate().getTime());
+
+        // ensure that no real time has elapsed
+        assertTrue(System.currentTimeMillis() < treal + tsleep / 2);
+    }
+
+}
diff --git a/utils/src/main/java/org/onap/policy/common/utils/time/CurrentTime.java b/utils/src/main/java/org/onap/policy/common/utils/time/CurrentTime.java
new file mode 100644
index 0000000..cab469e
--- /dev/null
+++ b/utils/src/main/java/org/onap/policy/common/utils/time/CurrentTime.java
@@ -0,0 +1,61 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.Date;
+
+/**
+ * Methods to access the current time. Classes can use objects of this type to get current
+ * time information, while allowing the objects to be overridden by junit tests.
+ */
+public class CurrentTime {
+
+    /**
+     * 
+     */
+    public CurrentTime() {
+        super();
+    }
+
+    /**
+     * @return the current time, in milliseconds
+     */
+    public long getMillis() {
+        return System.currentTimeMillis();
+    }
+
+    /**
+     * @return the current Date
+     */
+    public Date getDate() {
+        return new Date();
+    }
+
+    /**
+     * Sleeps for a period of time.
+     * 
+     * @param sleepMs amount of time to sleep, in milliseconds
+     * @throws InterruptedException
+     */
+    public void sleep(long sleepMs) throws InterruptedException {
+        Thread.sleep(sleepMs);
+    }
+}
diff --git a/utils/src/test/java/org/onap/policy/common/utils/time/CurrentTimeTest.java b/utils/src/test/java/org/onap/policy/common/utils/time/CurrentTimeTest.java
new file mode 100644
index 0000000..694a3d2
--- /dev/null
+++ b/utils/src/test/java/org/onap/policy/common/utils/time/CurrentTimeTest.java
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Common Utils
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class CurrentTimeTest {
+
+    @Test
+    public void testGetMillis() {
+        long tcur = System.currentTimeMillis();
+        long tval = new CurrentTime().getMillis();
+        long tval2 = new CurrentTime().getMillis();
+        long tend = System.currentTimeMillis();
+
+        assertTrue(tval >= tcur && tval <= tend);
+        assertTrue(tval2 >= tcur && tval2 <= tend);
+    }
+
+    @Test
+    public void testGetDate() {
+        long tcur = System.currentTimeMillis();
+        long tval = new CurrentTime().getDate().getTime();
+        long tval2 = new CurrentTime().getDate().getTime();
+        long tend = System.currentTimeMillis();
+
+        assertTrue(tval >= tcur && tval <= tend);
+        assertTrue(tval2 >= tcur && tval2 <= tend);
+    }
+
+    @Test
+    public void testSleep() throws Exception {
+        long tcur = System.currentTimeMillis();
+        new CurrentTime().sleep(10);
+        long tend = System.currentTimeMillis();
+
+        assertTrue(tend >= tcur + 10 - 1);
+    }
+
+}