Merge "Distinguish lock from refresh"
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
deleted file mode 100644
index 0bed85b..0000000
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/EventQueue.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-import java.util.Deque;
-import java.util.LinkedList;
-import org.onap.policy.drools.pooling.message.Forward;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Finite queue of events waiting to be processed once the buckets have been
- * assigned.
- */
-public class EventQueue {
-
-    private static final Logger logger = LoggerFactory.getLogger(EventQueue.class);
-
-    /**
-     * Maximum number of events allowed in the queue. When excess events are
-     * added, the older events are removed.
-     */
-    private int maxEvents;
-
-    /**
-     * Maximum age, in milliseconds, of events in the queue. Events that are
-     * older than this are discarded rather than being handed off when
-     * {@link #poll()} is invoked.
-     */
-    private long maxAgeMs;
-
-    /**
-     * The actual queue of events.
-     */
-    private Deque<Forward> events = new LinkedList<>();
-
-    /**
-     * 
-     * @param maxEvents maximum number of events to hold in the queue
-     * @param maxAgeMs maximum age of events in the queue
-     */
-    public EventQueue(int maxEvents, long maxAgeMs) {
-        this.maxEvents = maxEvents;
-        this.maxAgeMs = maxAgeMs;
-    }
-
-    /**
-     * 
-     * @return {@code true} if the queue is empty, {@code false} otherwise
-     */
-    public boolean isEmpty() {
-        return events.isEmpty();
-    }
-
-    /**
-     * Clears the queue.
-     */
-    public void clear() {
-        events.clear();
-    }
-
-    /**
-     * 
-     * @return the number of elements in the queue
-     */
-    public int size() {
-        return events.size();
-    }
-
-    /**
-     * Adds an item to the queue. If the queue is full, the older item is
-     * removed and discarded.
-     * 
-     * @param event
-     */
-    public void add(Forward event) {
-        if (events.size() >= maxEvents) {
-            logger.warn("full queue - discarded event for topic {}", event.getTopic());
-            events.remove();
-        }
-
-        events.add(event);
-    }
-
-    /**
-     * Gets the oldest, un-expired event from the queue.
-     * 
-     * @return the oldest, un-expired event
-     */
-    public Forward poll() {
-        long tmin = System.currentTimeMillis() - maxAgeMs;
-
-        Forward ev;
-        while ((ev = events.poll()) != null) {
-            if (!ev.isExpired(tmin)) {
-                break;
-            }
-        }
-
-        return ev;
-    }
-
-}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
index 5036b60..c25dc12 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManager.java
@@ -20,7 +20,6 @@
 
 package org.onap.policy.drools.pooling;
 
-import java.util.concurrent.CountDownLatch;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
 import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Message;
@@ -55,22 +54,11 @@
     public String getTopic();
 
     /**
-     * Indicates that communication with internal DMaaP topic failed, typically due to a
-     * missed heart beat. Stops the PolicyController.
-     * 
-     * @return a latch that can be used to determine when the controller's stop() method
-     *         has completed
-     */
-    public CountDownLatch internalTopicFailed();
-
-    /**
      * Starts distributing requests according to the given bucket assignments.
      * 
      * @param assignments must <i>not</i> be {@code null}
-     * @return a latch that can be used to determine when the events in the event queue
-     *         have all be processed
      */
-    public CountDownLatch startDistributing(BucketAssignments assignments);
+    public void startDistributing(BucketAssignments assignments);
 
     /**
      * Gets the current bucket assignments.
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 86cec4c..68dfee1 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -145,11 +145,6 @@
     private ScheduledThreadPoolExecutor scheduler = null;
 
     /**
-     * Queue used when no bucket assignments are available.
-     */
-    private final EventQueue eventq;
-
-    /**
      * {@code True} if events offered by the controller should be intercepted,
      * {@code false} otherwise.
      */
@@ -175,7 +170,6 @@
             this.listener = (TopicListener) controller;
             this.serializer = new Serializer();
             this.topic = props.getPoolingTopic();
-            this.eventq = factory.makeEventQueue(props);
             this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
             this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic());
             this.current = new IdleState(this);
@@ -307,11 +301,6 @@
      */
     public void afterStop() {
         synchronized (curLocker) {
-            if (!eventq.isEmpty()) {
-                logger.warn("discarded {} messages after stopping topic {}", eventq.size(), topic);
-                eventq.clear();
-            }
-
             /*
              * stop the publisher, but allow time for any Offline message to be
              * transmitted
@@ -381,26 +370,6 @@
     }
 
     @Override
-    public CountDownLatch internalTopicFailed() {
-        logger.error("communication failed for topic {}", topic);
-
-        CountDownLatch latch = new CountDownLatch(1);
-
-        /*
-         * We don't want to build up items in our queue if we can't forward them to other
-         * hosts, so we just stop the controller.
-         * 
-         * Use a background thread to prevent deadlocks.
-         */
-        new Thread(() -> {
-            controller.stop();
-            latch.countDown();
-        }).start();
-
-        return latch;
-    }
-
-    @Override
     public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
         // wrap the task in a TimerAction and schedule it
         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
@@ -556,12 +525,11 @@
      */
     private boolean handleExternal(Forward event) {
         if (assignments == null) {
-            // no bucket assignments yet - add it to the queue
-            logger.info("queued event for request {}", event.getRequestId());
-            eventq.add(event);
+            // no bucket assignments yet - handle locally
+            logger.info("handle event locally for request {}", event.getRequestId());
 
-            // we've consumed the event
-            return true;
+            // we did NOT consume the event
+            return false;
 
         } else {
             return handleEvent(event);
@@ -741,42 +709,12 @@
     }
 
     @Override
-    public CountDownLatch startDistributing(BucketAssignments asgn) {
+    public void startDistributing(BucketAssignments asgn) {
         synchronized (curLocker) {
             int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
             logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
             assignments = asgn;
         }
-
-        if (asgn == null) {
-            return null;
-        }
-
-        /*
-         * publish the events from the event queue, but do it in a background thread so
-         * that the state machine can enter its correct state BEFORE we start processing
-         * the events
-         */
-        CountDownLatch latch = new CountDownLatch(1);
-
-        new Thread(() -> {
-            synchronized (curLocker) {
-                if (assignments == null) {
-                    latch.countDown();
-                    return;
-                }
-
-                // now that we have assignments, we can process the queue
-                Forward ev;
-                while ((ev = eventq.poll()) != null) {
-                    handle(ev);
-                }
-
-                latch.countDown();
-            }
-        }).start();
-
-        return latch;
     }
 
     @Override
@@ -846,16 +784,6 @@
     public static class Factory {
 
         /**
-         * Creates an event queue.
-         * 
-         * @param props properties used to configure the event queue
-         * @return a new event queue
-         */
-        public EventQueue makeEventQueue(PoolingProperties props) {
-            return new EventQueue(props.getOfflineLimit(), props.getOfflineAgeMs());
-        }
-
-        /**
          * Creates object extractors.
          * 
          * @param props properties used to configure the extractors
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
index fcb0e13..a1be2a7 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/state/State.java
@@ -147,11 +147,12 @@
      * @return the new state, or {@code null} if the state is unchanged
      */
     public State process(Forward msg) {
-        if(!getHost().equals(msg.getChannel())) {
-            logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(), getTopic());
+        if (!getHost().equals(msg.getChannel())) {
+            logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
+                            getTopic());
             return null;
         }
-        
+
         logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
         mgr.handle(msg);
         return null;
@@ -337,26 +338,26 @@
 
     /**
      * Indicates that we failed to see our own heartbeat; must be a problem with the
-     * internal topic.
+     * internal topic. Assumes the problem is temporary and continues to use the current
+     * bucket assignments.
      * 
      * @return a new {@link StartState}
      */
     protected final State missedHeartbeat() {
         publish(makeOffline());
-        mgr.startDistributing(null);
 
         return mgr.goStart();
     }
 
     /**
      * Indicates that the internal topic failed; this should only be invoked from the
-     * StartState.
+     * StartState. Discards bucket assignments and begins processing everything locally.
      * 
      * @return a new {@link InactiveState}
      */
     protected final State internalTopicFailed() {
         publish(makeOffline());
-        mgr.internalTopicFailed();
+        mgr.startDistributing(null);
 
         return mgr.goInactive();
     }
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
deleted file mode 100644
index 2414468..0000000
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/EventQueueTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import java.util.LinkedList;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.message.Forward;
-
-public class EventQueueTest {
-
-    private static final int MAX_SIZE = 5;
-    private static final long MAX_AGE_MS = 3000L;
-
-    private static final String MY_SOURCE = "my.source";
-    private static final CommInfrastructure MY_PROTO = CommInfrastructure.UEB;
-    private static final String MY_TOPIC = "my.topic";
-    private static final String MY_PAYLOAD = "my.payload";
-    private static final String MY_REQID = "my.request.id";
-
-    private EventQueue queue;
-
-    @Before
-    public void setUp() {
-        queue = new EventQueue(MAX_SIZE, MAX_AGE_MS);
-
-    }
-
-    @Test
-    public void testEventQueue() {
-        // shouldn't generate an exception
-        new EventQueue(1, 1);
-    }
-
-    @Test
-    public void testClear() {
-        // add some items
-        queue.add(makeActive());
-        queue.add(makeActive());
-
-        assertFalse(queue.isEmpty());
-
-        queue.clear();
-
-        // should be empty now
-        assertTrue(queue.isEmpty());
-    }
-
-    @Test
-    public void testIsEmpty() {
-        // test when empty
-        assertTrue(queue.isEmpty());
-
-        // all active
-        Forward msg1 = makeActive();
-        Forward msg2 = makeActive();
-        queue.add(msg1);
-        assertFalse(queue.isEmpty());
-
-        queue.add(msg2);
-        assertFalse(queue.isEmpty());
-
-        assertEquals(msg1, queue.poll());
-        assertFalse(queue.isEmpty());
-
-        assertEquals(msg2, queue.poll());
-        assertTrue(queue.isEmpty());
-
-        // active, expired, expired, active
-        queue.add(msg1);
-        queue.add(makeInactive());
-        queue.add(makeInactive());
-        queue.add(msg2);
-
-        assertEquals(msg1, queue.poll());
-        assertFalse(queue.isEmpty());
-
-        assertEquals(msg2, queue.poll());
-        assertTrue(queue.isEmpty());
-    }
-
-    @Test
-    public void testSize() {
-        queue = new EventQueue(2, 1000L);
-        assertEquals(0, queue.size());
-
-        queue.add(makeActive());
-        assertEquals(1, queue.size());
-
-        queue.poll();
-        assertEquals(0, queue.size());
-
-        queue.add(makeActive());
-        queue.add(makeActive());
-        assertEquals(2, queue.size());
-
-        queue.poll();
-        assertEquals(1, queue.size());
-
-        queue.poll();
-        assertEquals(0, queue.size());
-    }
-
-    @Test
-    public void testAdd() {
-        int nextra = 3;
-
-        // create excess messages
-        LinkedList<Forward> msgs = new LinkedList<>();
-        for (int x = 0; x < MAX_SIZE + nextra; ++x) {
-            msgs.add(makeActive());
-        }
-
-        // add them to the queue
-        msgs.forEach(msg -> queue.add(msg));
-
-        // should not have added too many messages
-        assertEquals(MAX_SIZE, queue.size());
-
-        // should have discarded the first "nextra" items
-        for (int x = 0; x < MAX_SIZE; ++x) {
-            assertEquals("x=" + x, msgs.get(x + nextra), queue.poll());
-        }
-
-        assertEquals(null, queue.poll());
-    }
-
-    @Test
-    public void testPoll() {
-        // poll when empty
-        assertNull(queue.poll());
-
-        // all active
-        Forward msg1 = makeActive();
-        Forward msg2 = makeActive();
-        queue.add(msg1);
-        queue.add(msg2);
-
-        assertEquals(msg1, queue.poll());
-        assertEquals(msg2, queue.poll());
-        assertEquals(null, queue.poll());
-
-        // active, expired, expired, active
-        queue.add(msg1);
-        queue.add(makeInactive());
-        queue.add(makeInactive());
-        queue.add(msg2);
-
-        assertEquals(msg1, queue.poll());
-        assertEquals(msg2, queue.poll());
-        assertEquals(null, queue.poll());
-
-        // one that's close to the age limit
-        msg1 = makeActive();
-        msg1.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS + 100);
-        queue.add(msg1);
-        assertEquals(msg1, queue.poll());
-        assertEquals(null, queue.poll());
-    }
-
-    private Forward makeActive() {
-        return new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
-    }
-
-    private Forward makeInactive() {
-        Forward msg = new Forward(MY_SOURCE, MY_PROTO, MY_TOPIC, MY_PAYLOAD, MY_REQID);
-
-        msg.setCreateTimeMs(System.currentTimeMillis() - MAX_AGE_MS - 100);
-
-        return msg;
-    }
-
-}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
index 298af06..6280ebe 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
@@ -28,9 +28,11 @@
 import static org.mockito.Mockito.when;
 import static org.onap.policy.drools.pooling.PoolingProperties.PREFIX;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Deque;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
@@ -121,6 +123,7 @@
     // these are saved and restored on exit from this test class
     private static PoolingFeature.Factory saveFeatureFactory;
     private static PoolingManagerImpl.Factory saveManagerFactory;
+    private static DmaapManager.Factory saveDmaapFactory;
 
     /**
      * Sink for external DMaaP topic.
@@ -128,6 +131,11 @@
     private static TopicSink externalSink;
 
     /**
+     * Sink for internal DMaaP topic.
+     */
+    private static TopicSink internalSink;
+
+    /**
      * Context for the current test case.
      */
     private Context ctx;
@@ -137,18 +145,23 @@
     public static void setUpBeforeClass() {
         saveFeatureFactory = PoolingFeature.getFactory();
         saveManagerFactory = PoolingManagerImpl.getFactory();
+        saveDmaapFactory = DmaapManager.getFactory();
 
-        Properties props = makeSinkProperties(EXTERNAL_TOPIC);
-        externalSink = TopicEndpoint.manager.addTopicSinks(props).get(0);
+        externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
         externalSink.start();
+
+        internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
+        internalSink.start();
     }
 
     @AfterClass
     public static void tearDownAfterClass() {
         PoolingFeature.setFactory(saveFeatureFactory);
         PoolingManagerImpl.setFactory(saveManagerFactory);
+        DmaapManager.setFactory(saveDmaapFactory);
 
         externalSink.stop();
+        internalSink.stop();
     }
 
     @Before
@@ -443,6 +456,7 @@
         private final AtomicBoolean sawMsg = new AtomicBoolean(false);
 
         private final TopicSource externalSource;
+        private final TopicSource internalSource;
 
         // mock objects
         private final PolicyEngine engine = mock(PolicyEngine.class);
@@ -458,8 +472,8 @@
             when(controller.getName()).thenReturn(CONTROLLER1);
             when(controller.getDrools()).thenReturn(drools);
 
-            Properties props = makeSourceProperties(EXTERNAL_TOPIC);
-            externalSource = TopicEndpoint.manager.addTopicSources(props).get(0);
+            externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
+            internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
 
             // stop consuming events if the controller stops
             when(controller.stop()).thenAnswer(args -> {
@@ -489,6 +503,18 @@
          * "DMaaP" topic and its own internal "DMaaP" topic.
          */
         public void start() {
+            DmaapManager.setFactory(new DmaapManager.Factory() {
+                @Override
+                public List<TopicSource> getTopicSources() {
+                    return Arrays.asList(internalSource, externalSource);
+                }
+
+                @Override
+                public List<TopicSink> getTopicSinks() {
+                    return Arrays.asList(internalSink, externalSink);
+                }
+            });
+
             feature.beforeStart(engine);
             feature.afterCreate(controller);
 
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index 64573ab..d74b87f 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -23,7 +23,6 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -106,7 +105,6 @@
     private Properties plainProps;
     private PoolingProperties poolProps;
     private ListeningController controller;
-    private EventQueue eventQueue;
     private ClassExtractors extractors;
     private DmaapManager dmaap;
     private ScheduledThreadPoolExecutor sched;
@@ -146,14 +144,12 @@
         active = new CountDownLatch(1);
 
         factory = mock(Factory.class);
-        eventQueue = mock(EventQueue.class);
         extractors = mock(ClassExtractors.class);
         dmaap = mock(DmaapManager.class);
         controller = mock(ListeningController.class);
         sched = mock(ScheduledThreadPoolExecutor.class);
         drools = mock(DroolsController.class);
 
-        when(factory.makeEventQueue(any())).thenReturn(eventQueue);
         when(factory.makeClassExtractors(any())).thenReturn(extractors);
         when(factory.makeDmaapManager(any())).thenReturn(dmaap);
         when(factory.makeScheduler()).thenReturn(sched);
@@ -304,24 +300,25 @@
     @Test
     public void testBeforeStop() throws Exception {
         startMgr();
-        mgr.startDistributing(makeAssignments(true));
+        mgr.startDistributing(makeAssignments(false));
 
-        // verify that this message is not queued
         Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
         mgr.handle(msg);
-        verify(eventQueue, never()).add(msg);
+        verify(dmaap, times(START_PUB+1)).publish(any());
         
         mgr.beforeStop();
 
         verify(dmaap).stopConsumer(mgr);
         verify(sched).shutdownNow();
+        verify(dmaap, times(START_PUB+2)).publish(any());
         verify(dmaap).publish(contains("offline"));
 
         assertTrue(mgr.getCurrent() instanceof IdleState);
 
-        // verify that next message is queued
-        mgr.handle(msg);        
-        verify(eventQueue).add(msg);
+        // verify that next message is handled locally
+        mgr.handle(msg);
+        verify(dmaap, times(START_PUB+2)).publish(any());
+        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
     }
 
     @Test
@@ -360,26 +357,8 @@
         startMgr();
         mgr.beforeStop();
 
-        when(eventQueue.isEmpty()).thenReturn(false);
-        when(eventQueue.size()).thenReturn(3);
-
         mgr.afterStop();
 
-        verify(eventQueue).clear();
-        verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
-    }
-
-    @Test
-    public void testAfterStop_EmptyQueue() throws Exception {
-        startMgr();
-        mgr.beforeStop();
-
-        when(eventQueue.isEmpty()).thenReturn(true);
-        when(eventQueue.size()).thenReturn(0);
-
-        mgr.afterStop();
-
-        verify(eventQueue, never()).clear();
         verify(dmaap).stopPublisher(STD_OFFLINE_PUB_WAIT_MS);
     }
 
@@ -499,18 +478,6 @@
     }
 
     @Test
-    public void testInternalTopicFailed() throws Exception {
-        startMgr();
-
-        CountDownLatch latch = mgr.internalTopicFailed();
-
-        // wait for the thread to complete
-        assertTrue(latch.await(2, TimeUnit.SECONDS));
-
-        verify(controller).stop();
-    }
-
-    @Test
     public void testSchedule() throws Exception {
         // must start the scheduler
         startMgr();
@@ -693,14 +660,7 @@
     public void testBeforeInsert_NoIntercept() throws Exception {
         startMgr();
 
-        long tbegin = System.currentTimeMillis();
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
-        verify(eventQueue).add(msgCap.capture());
-
-        validateMessageContent(tbegin, msgCap.getValue());
+        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
     }
 
     @Test
@@ -726,37 +686,20 @@
         startMgr();
 
         assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        // should not have tried to enqueue a message
-        verify(eventQueue, never()).add(any());
     }
 
     @Test
     public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
         startMgr();
 
-        long tbegin = System.currentTimeMillis();
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
-        verify(eventQueue).add(msgCap.capture());
-
-        validateMessageContent(tbegin, msgCap.getValue());
+        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
     }
 
     @Test
     public void testHandleExternalForward_NoAssignments() throws Exception {
         startMgr();
 
-        long tbegin = System.currentTimeMillis();
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
-        verify(eventQueue).add(msgCap.capture());
-
-        validateMessageContent(tbegin, msgCap.getValue());
+        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
     }
 
     @Test
@@ -921,25 +864,26 @@
     @Test
     public void testMakeForward() throws Exception {
         startMgr();
-
-        long tbegin = System.currentTimeMillis();
+        
+        // route the message to another host
+        mgr.startDistributing(makeAssignments(false));
 
         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        ArgumentCaptor<Forward> msgCap = ArgumentCaptor.forClass(Forward.class);
-        verify(eventQueue).add(msgCap.capture());
-
-        validateMessageContent(tbegin, msgCap.getValue());
+        
+        verify(dmaap, times(START_PUB+1)).publish(any());
     }
 
     @Test
     public void testMakeForward_InvalidMsg() throws Exception {
         startMgr();
+        
+        // route the message to another host
+        mgr.startDistributing(makeAssignments(false));
 
         assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
 
-        // should not have tried to enqueue a message
-        verify(eventQueue, never()).add(any());
+        // should not have tried to publish a message
+        verify(dmaap, times(START_PUB)).publish(any());
     }
 
     @Test
@@ -1064,69 +1008,27 @@
         startMgr();
 
         // route the message to this host
-        assertNotNull(mgr.startDistributing(makeAssignments(true)));
+        mgr.startDistributing(makeAssignments(true));
         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(eventQueue, never()).add(any());
+        verify(dmaap, times(START_PUB)).publish(any());
 
 
-        // null assignments should cause message to be queued
-        assertNull(mgr.startDistributing(null));
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(eventQueue).add(any());
+        // null assignments should cause message to be processed locally
+        mgr.startDistributing(null);
+        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        verify(dmaap, times(START_PUB)).publish(any());
 
 
         // route the message to this host
-        assertNotNull(mgr.startDistributing(makeAssignments(true)));
+        mgr.startDistributing(makeAssignments(true));
         assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(eventQueue).add(any());
+        verify(dmaap, times(START_PUB)).publish(any());
 
 
         // route the message to the other host
-        assertNotNull(mgr.startDistributing(makeAssignments(false)));
+        mgr.startDistributing(makeAssignments(false));
         assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(eventQueue).add(any());
-    }
-
-    @Test
-    public void testStartDistributing_EventsInQueue_ProcessLocally() throws Exception {
-        startMgr();
-
-        // put items in the queue
-        LinkedList<Forward> lst = new LinkedList<>();
-        lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-        lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-        lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-
-        when(eventQueue.poll()).thenAnswer(args -> lst.poll());
-
-        // route the messages to this host
-        CountDownLatch latch = mgr.startDistributing(makeAssignments(true));
-        assertTrue(latch.await(2, TimeUnit.SECONDS));
-
-        // all of the events should have been processed locally
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller, times(3)).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-    }
-
-    @Test
-    public void testStartDistributing_EventsInQueue_Forward() throws Exception {
-        startMgr();
-
-        // put items in the queue
-        LinkedList<Forward> lst = new LinkedList<>();
-        lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-        lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-        lst.add(new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID));
-
-        when(eventQueue.poll()).thenAnswer(args -> lst.poll());
-
-        // route the messages to the OTHER host
-        CountDownLatch latch = mgr.startDistributing(makeAssignments(false));
-        assertTrue(latch.await(2, TimeUnit.SECONDS));
-
-        // all of the events should have been forwarded
-        verify(dmaap, times(4)).publish(any());
-        verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+        verify(dmaap, times(START_PUB+1)).publish(any());
     }
 
     @Test
@@ -1228,22 +1130,6 @@
     }
 
     /**
-     * Validates the message content.
-     * 
-     * @param tbegin creation time stamp must be no less than this
-     * @param msg message to be validated
-     */
-    private void validateMessageContent(long tbegin, Forward msg) {
-        assertEquals(0, msg.getNumHops());
-        assertTrue(msg.getCreateTimeMs() >= tbegin);
-        assertEquals(mgr.getHost(), msg.getSource());
-        assertEquals(CommInfrastructure.UEB, msg.getProtocol());
-        assertEquals(TOPIC2, msg.getTopic());
-        assertEquals(THE_EVENT, msg.getPayload());
-        assertEquals(REQUEST_ID, msg.getRequestId());
-    }
-
-    /**
      * Configure the mock controller to act like a real controller, invoking beforeOffer
      * and then beforeInsert, so we can make sure they pass through. We'll keep count to
      * ensure we don't get into infinite recursion.
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
index 27284dc..f270103 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/ActiveStateTest.java
@@ -394,8 +394,8 @@
         // fire the task - should transition
         assertEquals(next, task.third().fire());
 
-        // should stop distributing
-        verify(mgr).startDistributing(null);
+        // should continue to distribute
+        verify(mgr, never()).startDistributing(null);
 
         // should publish an offline message
         Offline msg = captureAdminMessage(Offline.class);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
index 80778ed..7cd3758 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/QueryStateTest.java
@@ -257,8 +257,8 @@
 
         assertEquals(next, timer.second().fire());
 
-        // should stop distributing
-        verify(mgr).startDistributing(null);
+        // should continue distributing
+        verify(mgr, never()).startDistributing(null);
 
         Offline msg = captureAdminMessage(Offline.class);
         assertEquals(MY_HOST, msg.getSource());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
index 01f49b5..18f12ff 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StartStateTest.java
@@ -111,7 +111,7 @@
 
         assertEquals(next, checker.second().fire());
 
-        verify(mgr).internalTopicFailed();
+        verify(mgr).startDistributing(null);
     }
 
     @Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
index cdf9b59..e3d383d 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/state/StateTest.java
@@ -423,7 +423,8 @@
         State next2 = state.missedHeartbeat();
         assertEquals(next, next2);
 
-        verify(mgr).startDistributing(null);
+        // should continue to distribute
+        verify(mgr, never()).startDistributing(null);
 
         Offline msg = captureAdminMessage(Offline.class);
         assertEquals(MY_HOST, msg.getSource());
@@ -437,7 +438,8 @@
         State next2 = state.internalTopicFailed();
         assertEquals(next, next2);
 
-        verify(mgr).internalTopicFailed();
+        // should stop distributing
+        verify(mgr).startDistributing(null);
 
         Offline msg = captureAdminMessage(Offline.class);
         assertEquals(MY_HOST, msg.getSource());