Sonar fixes for latest pooling code

Made changes to pooling code to address new sonar issues.
Add comments to awaitActive() methods.

Change-Id: I390173de00135a0a5fe50af82ed4ba780df9df80
Issue-ID: POLICY-728
Signed-off-by: Jim Hahn <jrh3@att.com>
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 2bec457..1e2071a 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -23,6 +23,7 @@
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import org.onap.policy.common.utils.properties.exception.PropertyException;
 import org.onap.policy.drools.controller.DroolsController;
@@ -67,7 +68,12 @@
     /**
      * Maps a controller name to its associated manager.
      */
-    private ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+    private final ConcurrentHashMap<String, PoolingManagerImpl> ctlr2pool = new ConcurrentHashMap<>(107);
+
+    /**
+     * Decremented each time a manager enters the Active state. Used by junit tests.
+     */
+    private final CountDownLatch activeLatch = new CountDownLatch(1);
 
     /**
      * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is
@@ -102,6 +108,13 @@
         return host;
     }
 
+    /**
+     * @return a latch that will be decremented when a manager enters the active state
+     */
+    protected CountDownLatch getActiveLatch() {
+        return activeLatch;
+    }
+
     @Override
     public int getSequenceNumber() {
         return 0;
@@ -135,7 +148,7 @@
                 PoolingProperties props = new PoolingProperties(name, featProps);
 
                 logger.info("pooling enabled for {}", name);
-                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props));
+                ctlr2pool.computeIfAbsent(name, xxx -> factory.makeManager(host, controller, props, activeLatch));
 
             } catch (PropertyException e) {
                 logger.error("pooling disabled due to exception for {}", name, e);
@@ -386,10 +399,12 @@
          * @param host name/uuid of this host
          * @param controller
          * @param props properties to use to configure the manager
+         * @param activeLatch decremented when the manager goes Active
          * @return a new pooling manager
          */
-        public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props) {
-            return new PoolingManagerImpl(host, controller, props);
+        public PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props,
+                        CountDownLatch activeLatch) {
+            return new PoolingManagerImpl(host, controller, props, activeLatch);
         }
 
         /**
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index d231246..de25e47 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -94,6 +94,11 @@
     private final TopicListener listener;
 
     /**
+     * Decremented each time the manager enters the Active state. Used by junit tests.
+     */
+    private final CountDownLatch activeLatch;
+
+    /**
      * Used to encode & decode request objects received from & sent to a rule engine.
      */
     private final Serializer serializer;
@@ -142,7 +147,7 @@
     /**
      * Queue used when no bucket assignments are available.
      */
-    private EventQueue eventq;
+    private final EventQueue eventq;
 
     /**
      * {@code True} if events offered by the controller should be intercepted,
@@ -156,11 +161,15 @@
      * @param host name/uuid of this host
      * @param controller controller with which this is associated
      * @param props feature properties specific to the controller
+     * @param activeLatch latch to be decremented each time the manager enters the Active
+     *        state
      */
-    public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props) {
+    public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
+                    CountDownLatch activeLatch) {
         this.host = host;
         this.controller = controller;
         this.props = props;
+        this.activeLatch = activeLatch;
 
         try {
             this.listener = (TopicListener) controller;
@@ -237,13 +246,13 @@
      * @return DMaaP properties
      */
     private Properties makeDmaapProps(PolicyController controller, Properties source) {
-        SpecProperties props = new SpecProperties("", "controller." + controller.getName(), source);
+        SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
 
         // could be UEB or DMAAP, so add both
-        addDmaapConsumerProps(props, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
-        addDmaapConsumerProps(props, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+        addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+        addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
 
-        return props;
+        return specProps;
     }
 
     /**
@@ -255,7 +264,7 @@
      */
     private void addDmaapConsumerProps(SpecProperties props, String prefix) {
         String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
-        
+
         props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
         props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
     }
@@ -429,12 +438,7 @@
         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
 
         // wrap the future in a "CancellableScheduledTask"
-        return new CancellableScheduledTask() {
-            @Override
-            public void cancel() {
-                fut.cancel(false);
-            }
-        };
+        return () -> fut.cancel(false);
     }
 
     @Override
@@ -444,12 +448,7 @@
                         TimeUnit.MILLISECONDS);
 
         // wrap the future in a "CancellableScheduledTask"
-        return new CancellableScheduledTask() {
-            @Override
-            public void cancel() {
-                fut.cancel(false);
-            }
-        };
+        return () -> fut.cancel(false);
     }
 
     @Override
@@ -633,7 +632,7 @@
                             topic);
 
         } else {
-            logger.warn("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
+            logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
             event.bumpNumHops();
             publish(target, event);
         }
@@ -829,6 +828,7 @@
 
     @Override
     public State goActive() {
+        activeLatch.countDown();
         return new ActiveState(this);
     }
 
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
index b62ea0a..c831f70 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/SpecProperties.java
@@ -107,11 +107,11 @@
 
     @Override
     public final int hashCode() {
-        throw new UnsupportedOperationException("HostBucket cannot be hashed");
+        throw new UnsupportedOperationException("SpecProperties cannot be hashed");
     }
 
     @Override
     public final boolean equals(Object obj) {
-        throw new UnsupportedOperationException("cannot compare HostBuckets");
+        throw new UnsupportedOperationException("cannot compare SpecProperties");
     }
 }
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
index da04425..f717aa5 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/InactiveState.java
@@ -45,7 +45,7 @@
     @Override
     public void start() {
         super.start();
-        schedule(getProperties().getReactivateMs(), () -> goStart());
+        schedule(getProperties().getReactivateMs(), this::goStart);
     }
 
     @Override
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index 54e9323..545c2ef 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -122,7 +122,7 @@
     protected State goActive(BucketAssignments asgn) {
         startDistributing(asgn);
 
-        if (asgn.hasAssignment(getHost())) {
+        if (asgn != null && asgn.hasAssignment(getHost())) {
             return mgr.goActive();
 
         } else {
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index 29dc15e..a5688df 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -34,6 +34,7 @@
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -264,11 +265,15 @@
         long minms = 2000L;
 
         // tell the publisher to stop in minms + additional time
-        Thread thread = new Thread(() -> mgr.stopPublisher(minms + 3000L));
+        CountDownLatch latch = new CountDownLatch(1);
+        Thread thread = new Thread(() -> {
+            latch.countDown();
+            mgr.stopPublisher(minms + 3000L);
+        });
         thread.start();
 
-        // give the thread a chance to start
-        Thread.sleep(50L);
+        // wait for the thread to start
+        latch.await();
 
         // interrupt it - it should immediately finish its work
         thread.interrupt();
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
index 84449e7..6884bec 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
@@ -189,8 +189,7 @@
         }
 
         ctx.startHosts();
-
-        ctx.awaitEvents(STD_IDENTIFICATION_MS * 2, TimeUnit.MILLISECONDS);
+        ctx.awaitAllActive(STD_IDENTIFICATION_MS * 2);
 
         for (int x = 0; x < nmessages; ++x) {
             ctx.offerExternal(makeMessage(x));
@@ -414,6 +413,21 @@
         public boolean awaitEvents(long time, TimeUnit units) throws InterruptedException {
             return eventCounter.await(time, units);
         }
+
+        /**
+         * Waits, for a period of time, for all hosts to enter the Active state.
+         * 
+         * @param timeMs maximum time to wait, in milliseconds
+         * @throws InterruptedException
+         */
+        public void awaitAllActive(long timeMs) throws InterruptedException {
+            long tend = timeMs + System.currentTimeMillis();
+
+            for (Host host : hosts) {
+                long tremain = Math.max(0, tend - System.currentTimeMillis());
+                assertTrue(host.awaitActive(tremain));
+            }
+        }
     }
 
     /**
@@ -459,6 +473,18 @@
         }
 
         /**
+         * Waits, for a period of time, for the host to enter the Active state.
+         * 
+         * @param timeMs time to wait, in milliseconds
+         * @return {@code true} if the host entered the Active state within the given
+         *         amount of time, {@code false} otherwise
+         * @throws InterruptedException
+         */
+        public boolean awaitActive(long timeMs) throws InterruptedException {
+            return feature.getActiveLatch().await(timeMs, TimeUnit.MILLISECONDS);
+        }
+
+        /**
          * Starts threads for the host so that it begins consuming from both the external
          * "DMaaP" topic and its own internal "DMaaP" topic.
          */
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index 9ee2d97..f8f3755 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -123,7 +123,7 @@
         when(factory.getController(drools2)).thenReturn(controller2);
         when(factory.getController(droolsDisabled)).thenReturn(controllerDisabled);
 
-        when(factory.makeManager(any(), any(), any())).thenAnswer(args -> {
+        when(factory.makeManager(any(), any(), any(), any())).thenAnswer(args -> {
             PoolingProperties props = args.getArgument(2);
 
             PoolingManagerImpl mgr = mock(PoolingManagerImpl.class);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 693cb6d..e0024b7 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -113,6 +113,7 @@
     private DroolsController drools;
     private Serializer ser;
     private Factory factory;
+    private CountDownLatch active;
 
     private PoolingManagerImpl mgr;
 
@@ -142,6 +143,7 @@
 
         futures = new LinkedList<>();
         ser = new Serializer();
+        active = new CountDownLatch(1);
 
         factory = mock(Factory.class);
         eventQueue = mock(EventQueue.class);
@@ -181,7 +183,7 @@
 
         PoolingManagerImpl.setFactory(factory);
 
-        mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps);
+        mgr = new PoolingManagerImpl(MY_HOST, controller, poolProps, active);
     }
 
     @Test
@@ -204,7 +206,7 @@
         PolicyController ctlr = mock(PolicyController.class);
 
         PoolingFeatureRtException ex = expectException(PoolingFeatureRtException.class,
-                        () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps));
+                        () -> new PoolingManagerImpl(MY_HOST, ctlr, poolProps, active));
         assertNotNull(ex.getCause());
         assertTrue(ex.getCause() instanceof ClassCastException);
     }
@@ -216,7 +218,7 @@
         when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
 
         PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
-                        () -> new PoolingManagerImpl(MY_HOST, controller, poolProps));
+                        () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
         assertEquals(ex, ex2.getCause());
     }
 
@@ -233,7 +235,7 @@
     public void testGetHost() {
         assertEquals(MY_HOST, mgr.getHost());
 
-        mgr = new PoolingManagerImpl(HOST2, controller, poolProps);
+        mgr = new PoolingManagerImpl(HOST2, controller, poolProps, active);
         assertEquals(HOST2, mgr.getHost());
     }
 
@@ -1102,7 +1104,6 @@
 
         // route the messages to this host
         CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
-        assertNotNull(latch);
         assertTrue(latch.await(2, TimeUnit.SECONDS));
 
         // all of the events should have been processed locally
@@ -1123,7 +1124,8 @@
         when(eventQueue.poll()).thenAnswer(args -> lst.poll());
 
         // route the messages to the OTHER host
-        assertTrue(mgr.startDistributing(makeAssignments(false)).await(2, TimeUnit.SECONDS));
+        CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
+        assertTrue(latch.await(2, TimeUnit.SECONDS));
 
         // all of the events should have been forwarded
         verify(dmaap, times(4)).publish(any());
@@ -1159,6 +1161,7 @@
         assertTrue(st instanceof ActiveState);
         assertEquals(mgr.getHost(), st.getHost());
         assertEquals(asgn, mgr.getAssignments());
+        assertEquals(0, active.getCount());
     }
 
     @Test
@@ -1166,6 +1169,7 @@
         State st = mgr.goInactive();
         assertTrue(st instanceof InactiveState);
         assertEquals(mgr.getHost(), st.getHost());
+        assertEquals(1, active.getCount());
     }
 
     @Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index 08b55c6..47624aa 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -186,6 +186,19 @@
     }
 
     @Test
+    public void testGoActive_NullAssignment() {
+        State act = mock(State.class);
+        State inact = mock(State.class);
+
+        when(mgr.goActive()).thenReturn(act);
+        when(mgr.goInactive()).thenReturn(inact);
+
+        assertEquals(inact, state.goActive(null));
+
+        verify(mgr, never()).startDistributing(any());
+    }
+
+    @Test
     public void testGoInactive() {
         State next = mock(State.class);
         when(mgr.goInactive()).thenReturn(next);