Fix various problems in pooling

Renamed META-INF SessionAPI to EngineAPI, as it implements the latter.
Created default PoolingFeature.factory object.
Don't delete a controller's pooling manager when stop is called; do
that in afterHalt and afterShutdown.  This enables it to be restarted
as long as the controller still exists.
Only stop & start the internal DMaaP topic at the engine level instead
of the controller level.  This is necessary to prevent sinks for ALL
controllers from being started each time an individual controller starts.
Clear all bucket assignments when controller is stopped.
Mark test methods with @Override annotation.
Add default property file for pooling feature.
Add license to default property file.
Remove tests for doDeleteManager(), as it no longer exists.
Changed " = " to "=" in the property file.

Change-Id: I80c0c3f1879b5a320044db93e3dfa3b7281cda51
Issue-ID: POLICY-774
Signed-off-by: Jim Hahn <jrh3@att.com>
diff --git a/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
new file mode 100644
index 0000000..64a6b06
--- /dev/null
+++ b/feature-pooling-dmaap/src/main/feature/config/feature-pooling-dmaap.properties
@@ -0,0 +1,89 @@
+###
+# ============LICENSE_START=======================================================
+# feature-pooling-dmaap
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+# In general, the feature-specific properties begin with "pooling",
+# and they may be made specific to a controller by prepending with
+# "pooling.<controller-name>", instead.
+#
+# The available properties and their default values are shown below.
+
+# Whether or not the feature is enabled.
+#pooling.enabled=false
+
+# The internal DMaaP topic used by a controller.  Note: the controller
+# name is required for this property.
+#pooling.<controller-name>.topic =
+
+# Maximum number of events to retain in the queue while a new host waits
+# to be assigned work.
+#pooling.offline.queue.limit=1000
+
+# Maximum age, in milliseconds, of events to be retained in the queue.
+# Events older than this are discarded. 
+#pooling.offline.queue.age.milliseconds=60000
+
+# Time, in milliseconds, to wait for an "Offline" message to be published
+# to DMaaP before the connection may be closed.
+#pooling.offline.publish.wait.milliseconds=3000
+
+# Time, in milliseconds, to wait for this host's initial heart beat.  This
+# is used to verify connectivity to the internal DMaaP topic.
+#pooling.start.heartbeat.milliseconds=100000
+
+# Time, in milliseconds, to wait before attempting to re-active this
+# host when it was not assigned any work.
+#pooling.reactivate.milliseconds=50000
+
+# Time, in milliseconds, to wait for other hosts to identify themselves
+# when this host is started.
+#pooling.identification.milliseconds=50000
+
+# Time, in milliseconds, to wait for heart beats from this host, or its
+# predecessor, during the active state.
+#pooling.active.heartbeat.milliseconds=50000
+
+# Time, in milliseconds, to wait between heart beat generations.
+#pooling.inter.heartbeat.milliseconds=15000
+
+# Topic used for inter-host communication for a particular controller
+# pooling.<controller-name>.topic=XXX
+
+# These specify how the request id is to be extracted from each type of
+# object that may be presented to a controller from distributed topics
+# (i.e., topics whose events are to be distributed among multiple hosts)
+extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
+extractor.requestId.org.onap.policy.appc.Response=${commonHeader.requestId}
+extractor.requestId.org.onap.policy.appclcm.LcmResponseWrapper=${body.commonHeader.requestId}
+
+
+pooling.amsterdam.enabled=true
+pooling.amsterdam.topic=${{AMSTERDAM_POOLING_TOPIC}}
+
+# the list of sources and sinks should be identical
+ueb.source.topics=${{AMSTERDAM_POOLING_TOPIC}}
+ueb.sink.topics=${{AMSTERDAM_POOLING_TOPIC}}
+
+ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}}
+ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey=
+ueb.source.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret=
+
+ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.servers=${{DMAAP_SERVERS}}
+ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiKey=
+ueb.sink.topics.${{AMSTERDAM_POOLING_TOPIC}}.apiSecret=
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index abb4da3..eb41f80 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -21,7 +21,6 @@
 package org.onap.policy.drools.pooling;
 
 import java.util.List;
-import java.util.Properties;
 import org.onap.policy.drools.event.comm.FilterableTopicSource;
 import org.onap.policy.drools.event.comm.TopicEndpoint;
 import org.onap.policy.drools.event.comm.TopicListener;
@@ -31,7 +30,8 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Manages the internal DMaaP topic.
+ * Manages the internal DMaaP topic. Assumes all topics are managed by
+ * {@link TopicEndpoint#manager}.
  */
 public class DmaapManager {
 
@@ -58,18 +58,6 @@
     private final TopicSink topicSink;
 
     /**
-     * Topic sources. In theory, there's only one item in this list, the internal DMaaP
-     * topic.
-     */
-    private final List<TopicSource> sources;
-
-    /**
-     * Topic sinks. In theory, there's only one item in this list, the internal DMaaP
-     * topic.
-     */
-    private final List<TopicSink> sinks;
-
-    /**
      * {@code True} if the consumer is running, {@code false} otherwise.
      */
     private boolean consuming = false;
@@ -83,17 +71,14 @@
      * Constructs the manager, but does not start the source or sink.
      * 
      * @param topic name of the internal DMaaP topic
-     * @param props properties to configure the topic source & sink
      * @throws PoolingFeatureException if an error occurs
      */
-    public DmaapManager(String topic, Properties props) throws PoolingFeatureException {
+    public DmaapManager(String topic) throws PoolingFeatureException {
 
         logger.info("initializing bus for topic {}", topic);
 
         try {
             this.topic = topic;
-            this.sources = factory.initTopicSources(props);
-            this.sinks = factory.initTopicSinks(props);
 
             this.topicSource = findTopicSource();
             this.topicSink = findTopicSink();
@@ -132,7 +117,7 @@
      * @throws PoolingFeatureException if the source doesn't exist or is not filterable
      */
     private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
-        for (TopicSource src : sources) {
+        for (TopicSource src : factory.getTopicSources()) {
             if (topic.equals(src.getTopic())) {
                 if (src instanceof FilterableTopicSource) {
                     return (FilterableTopicSource) src;
@@ -153,7 +138,7 @@
      * @throws PoolingFeatureException if the sink doesn't exist
      */
     private TopicSink findTopicSink() throws PoolingFeatureException {
-        for (TopicSink sink : sinks) {
+        for (TopicSink sink : factory.getTopicSinks()) {
             if (topic.equals(sink.getTopic())) {
                 return sink;
             }
@@ -164,22 +149,14 @@
 
     /**
      * Starts the publisher, if it isn't already running.
-     * 
-     * @throws PoolingFeatureException if an error occurs
      */
-    public void startPublisher() throws PoolingFeatureException {
+    public void startPublisher() {
         if (publishing) {
             return;
         }
 
-        try {
-            logger.info("start publishing to topic {}", topic);
-            topicSink.start();
-            publishing = true;
-
-        } catch (IllegalStateException e) {
-            throw new PoolingFeatureException("cannot start topic sink " + topic, e);
-        }
+        logger.info("start publishing to topic {}", topic);
+        publishing = true;
     }
 
     /**
@@ -205,14 +182,8 @@
             Thread.currentThread().interrupt();
         }
 
-        try {
-            logger.info("stop publishing to topic {}", topic);
-            publishing = false;
-            topicSink.stop();
-
-        } catch (IllegalStateException e) {
-            logger.error("cannot stop sink for topic {}", topic, e);
-        }
+        logger.info("stop publishing to topic {}", topic);
+        publishing = false;
     }
 
     /**
@@ -288,23 +259,17 @@
     public static class Factory {
 
         /**
-         * Initializes the topic sources.
-         * 
-         * @param props properties used to configure the topics
          * @return the topic sources
          */
-        public List<TopicSource> initTopicSources(Properties props) {
-            return TopicEndpoint.manager.addTopicSources(props);
+        public List<TopicSource> getTopicSources() {
+            return TopicEndpoint.manager.getTopicSources();
         }
 
         /**
-         * Initializes the topic sinks.
-         * 
-         * @param props properties used to configure the topics
          * @return the topic sinks
          */
-        public List<TopicSink> initTopicSinks(Properties props) {
-            return TopicEndpoint.manager.addTopicSinks(props);
+        public List<TopicSink> getTopicSinks() {
+            return TopicEndpoint.manager.getTopicSinks();
         }
 
     }
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 1e2071a..67cb21e 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -20,13 +20,16 @@
 
 package org.onap.policy.drools.pooling;
 
+import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.function.Function;
 import org.onap.policy.common.utils.properties.exception.PropertyException;
 import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.event.comm.TopicEndpoint;
+import org.onap.policy.drools.event.comm.TopicSink;
+import org.onap.policy.drools.event.comm.TopicSource;
 import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
 import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
@@ -53,7 +56,7 @@
     /**
      * Factory used to create objects.
      */
-    private static Factory factory;
+    private static Factory factory = new Factory();
 
     /**
      * ID of this host.
@@ -124,6 +127,10 @@
     public boolean beforeStart(PolicyEngine engine) {
         logger.info("initializing " + PoolingProperties.FEATURE_NAME);
         featProps = factory.getProperties(PoolingProperties.FEATURE_NAME);
+        
+        factory.initTopicSources(featProps);
+        factory.initTopicSinks(featProps);
+        
         return false;
     }
 
@@ -189,17 +196,25 @@
 
     @Override
     public boolean afterStop(PolicyController controller) {
-
-        // NOTE: using doDeleteManager() instead of doManager()
-
-        return doDeleteManager(controller, mgr -> {
-
+        return doManager(controller, mgr -> {
             mgr.afterStop();
             return false;
         });
     }
 
     @Override
+    public boolean afterShutdown(PolicyController controller) {
+        deleteManager(controller);
+        return false;
+    }
+
+    @Override
+    public boolean afterHalt(PolicyController controller) {
+        deleteManager(controller);
+        return false;
+    }
+
+    @Override
     public boolean beforeLock(PolicyController controller) {
         return doManager(controller, mgr -> {
             mgr.beforeLock();
@@ -306,29 +321,17 @@
     }
 
     /**
-     * Executes a function using the manager associated with the controller and then
-     * deletes the manager. Catches any exceptions from the function and re-throws it as a
-     * runtime exception.
+     * Deletes the manager associated with a controller.
      * 
      * @param controller
-     * @param func function to be executed
-     * @return {@code true} if the function handled the request, {@code false} otherwise
      * @throws PoolingFeatureRtException if an error occurs
      */
-    private boolean doDeleteManager(PolicyController controller, Function<PoolingManagerImpl, Boolean> func) {
+    private void deleteManager(PolicyController controller) {
 
         String name = controller.getName();
         logger.info("remove feature-pool-dmaap manager for {}", name);
 
-        // NOTE: using "remove()" instead of "get()"
-
-        PoolingManagerImpl mgr = ctlr2pool.remove(name);
-
-        if (mgr == null) {
-            return false;
-        }
-
-        return func.apply(mgr);
+        ctlr2pool.remove(name);
     }
 
     /**
@@ -416,5 +419,25 @@
         public PolicyController getController(DroolsController droolsController) {
             return PolicyController.factory.get(droolsController);
         }
+
+        /**
+         * Initializes the topic sources.
+         * 
+         * @param props properties used to configure the topics
+         * @return the topic sources
+         */
+        public List<TopicSource> initTopicSources(Properties props) {
+            return TopicEndpoint.manager.addTopicSources(props);
+        }
+
+        /**
+         * Initializes the topic sinks.
+         * 
+         * @param props properties used to configure the topics
+         * @return the topic sinks
+         */
+        public List<TopicSink> initTopicSinks(Properties props) {
+            return TopicEndpoint.manager.addTopicSinks(props);
+        }
     }
 }
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index de25e47..33f4508 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -45,7 +45,6 @@
 import org.onap.policy.drools.pooling.state.StartState;
 import org.onap.policy.drools.pooling.state.State;
 import org.onap.policy.drools.pooling.state.StateTimerTask;
-import org.onap.policy.drools.properties.PolicyProperties;
 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
 import org.onap.policy.drools.system.PolicyController;
 import org.slf4j.Logger;
@@ -177,8 +176,7 @@
             this.topic = props.getPoolingTopic();
             this.eventq = factory.makeEventQueue(props);
             this.extractors = factory.makeClassExtractors(makeExtractorProps(controller, props.getSource()));
-            this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic(),
-                            makeDmaapProps(controller, props.getSource()));
+            this.dmaapMgr = factory.makeDmaapManager(props.getPoolingTopic());
             this.current = new IdleState(this);
 
             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -237,39 +235,6 @@
     }
 
     /**
-     * Makes properties for configuring DMaaP. Copies properties from the source that
-     * start with the Pooling property prefix followed by the controller name, stripping
-     * the prefix and controller name.
-     * 
-     * @param controller the controller for which DMaaP will be configured
-     * @param source properties from which to get the DMaaP properties
-     * @return DMaaP properties
-     */
-    private Properties makeDmaapProps(PolicyController controller, Properties source) {
-        SpecProperties specProps = new SpecProperties("", "controller." + controller.getName(), source);
-
-        // could be UEB or DMAAP, so add both
-        addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
-        addDmaapConsumerProps(specProps, PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
-
-        return specProps;
-    }
-
-    /**
-     * Adds DMaaP consumer properties, consumer group & instance. The group is the host
-     * and the instance is a constant.
-     * 
-     * @param props where to add the new properties
-     * @param prefix property prefix
-     */
-    private void addDmaapConsumerProps(SpecProperties props, String prefix) {
-        String fullpfx = props.getSpecPrefix() + prefix + "." + topic;
-
-        props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, host);
-        props.setProperty(fullpfx + PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, "0");
-    }
-
-    /**
      * Indicates that the controller is about to start. Starts the publisher for the
      * internal topic, and creates a thread pool for the timers.
      * 
@@ -325,6 +290,8 @@
                 dmaapMgr.stopConsumer(this);
                 publishAdmin(new Offline(getHost()));
             }
+
+            assignments = null;
         }
 
         if (sched != null) {
@@ -902,12 +869,11 @@
          * Creates a DMaaP manager.
          * 
          * @param topic name of the internal DMaaP topic
-         * @param props properties used to configure DMaaP
          * @return a new DMaaP manager
          * @throws PoolingFeatureException if an error occurs
          */
-        public DmaapManager makeDmaapManager(String topic, Properties props) throws PoolingFeatureException {
-            return new DmaapManager(topic, props);
+        public DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
+            return new DmaapManager(topic);
         }
 
         /**
diff --git a/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI b/feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
similarity index 100%
rename from feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI
rename to feature-pooling-dmaap/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
index a5688df..6509e90 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/DmaapManagerTest.java
@@ -24,16 +24,14 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import java.util.Arrays;
 import java.util.LinkedList;
-import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -56,7 +54,6 @@
      */
     private static Factory saveFactory;
 
-    private Properties props;
     private Factory factory;
     private TopicListener listener;
     private FilterableTopicSource source;
@@ -75,8 +72,6 @@
 
     @Before
     public void setUp() throws Exception {
-        props = new Properties();
-
         listener = mock(TopicListener.class);
         factory = mock(Factory.class);
         source = mock(FilterableTopicSource.class);
@@ -90,21 +85,21 @@
         when(sink.send(any())).thenReturn(true);
 
         // three sources, with the desired one in the middle
-        when(factory.initTopicSources(props))
+        when(factory.getTopicSources())
                         .thenReturn(Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class)));
 
         // three sinks, with the desired one in the middle
-        when(factory.initTopicSinks(props))
+        when(factory.getTopicSinks())
                         .thenReturn(Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class)));
 
-        mgr = new DmaapManager(MY_TOPIC, props);
+        mgr = new DmaapManager(MY_TOPIC);
     }
 
     @Test
     public void testDmaapManager() {
         // verify that the init methods were called
-        verify(factory).initTopicSinks(props);
-        verify(factory).initTopicSinks(props);
+        verify(factory).getTopicSinks();
+        verify(factory).getTopicSinks();
     }
 
     @Test(expected = PoolingFeatureException.class)
@@ -112,15 +107,15 @@
         // force error by having no topics match
         when(source.getTopic()).thenReturn("");
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test(expected = PoolingFeatureException.class)
     public void testDmaapManager_IllegalArgEx() throws PoolingFeatureException {
         // force error
-        when(factory.initTopicSources(props)).thenThrow(new IllegalArgumentException("expected"));
+        when(factory.getTopicSources()).thenThrow(new IllegalArgumentException("expected"));
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test(expected = PoolingFeatureException.class)
@@ -128,7 +123,7 @@
         // force an error when setFilter() is called
         doThrow(new UnsupportedOperationException("expected")).when(source).setFilter(any());
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test
@@ -148,25 +143,25 @@
         TopicSource source2 = mock(TopicSource.class);
         when(source2.getTopic()).thenReturn(MY_TOPIC);
 
-        when(factory.initTopicSources(props)).thenReturn(Arrays.asList(source2));
+        when(factory.getTopicSources()).thenReturn(Arrays.asList(source2));
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test(expected = PoolingFeatureException.class)
     public void testFindTopicSource_NotFound() throws PoolingFeatureException {
         // one item in list, and its topic doesn't match
-        when(factory.initTopicSources(props)).thenReturn(Arrays.asList(mock(TopicSource.class)));
+        when(factory.getTopicSources()).thenReturn(Arrays.asList(mock(TopicSource.class)));
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test(expected = PoolingFeatureException.class)
     public void testFindTopicSource_EmptyList() throws PoolingFeatureException {
         // empty list
-        when(factory.initTopicSources(props)).thenReturn(new LinkedList<>());
+        when(factory.getTopicSources()).thenReturn(new LinkedList<>());
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test
@@ -177,50 +172,26 @@
     @Test(expected = PoolingFeatureException.class)
     public void testFindTopicSink_NotFound() throws PoolingFeatureException {
         // one item in list, and its topic doesn't match
-        when(factory.initTopicSinks(props)).thenReturn(Arrays.asList(mock(TopicSink.class)));
+        when(factory.getTopicSinks()).thenReturn(Arrays.asList(mock(TopicSink.class)));
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test(expected = PoolingFeatureException.class)
     public void testFindTopicSink_EmptyList() throws PoolingFeatureException {
         // empty list
-        when(factory.initTopicSinks(props)).thenReturn(new LinkedList<>());
+        when(factory.getTopicSinks()).thenReturn(new LinkedList<>());
 
-        new DmaapManager(MY_TOPIC, props);
+        new DmaapManager(MY_TOPIC);
     }
 
     @Test
     public void testStartPublisher() throws PoolingFeatureException {
-        // not started yet
-        verify(sink, never()).start();
-
+        
         mgr.startPublisher();
-        verify(sink).start();
 
         // restart should have no effect
         mgr.startPublisher();
-        verify(sink).start();
-
-        // should be able to publish now
-        mgr.publish(MSG);
-        verify(sink).send(MSG);
-    }
-
-    @Test
-    public void testStartPublisher_Exception() throws PoolingFeatureException {
-        // force exception when it starts
-        doThrow(new IllegalStateException("expected")).when(sink).start();
-
-        expectException("startPublisher,start", () -> mgr.startPublisher());
-        expectException("startPublisher,publish", () -> mgr.publish(MSG));
-
-        // allow it to succeed this time
-        reset(sink);
-        when(sink.send(any())).thenReturn(true);
-
-        mgr.startPublisher();
-        verify(sink).start();
 
         // should be able to publish now
         mgr.publish(MSG);
@@ -231,18 +202,15 @@
     public void testStopPublisher() throws PoolingFeatureException {
         // not publishing yet, so stopping should have no effect
         mgr.stopPublisher(0);
-        verify(sink, never()).stop();
 
         // now start it
         mgr.startPublisher();
 
         // this time, stop should do something
         mgr.stopPublisher(0);
-        verify(sink).stop();
 
         // re-stopping should have no effect
         mgr.stopPublisher(0);
-        verify(sink).stop();
     }
 
     @Test
@@ -285,16 +253,6 @@
     }
 
     @Test
-    public void testStopPublisher_Exception() throws PoolingFeatureException {
-        mgr.startPublisher();
-
-        // force exception when it stops
-        doThrow(new IllegalStateException("expected")).when(sink).stop();
-
-        mgr.stopPublisher(0);
-    }
-
-    @Test
     public void testStartConsumer() {
         // not started yet
         verify(source, never()).register(any());
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
index cc58838..d453e74 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest.java
@@ -874,11 +874,6 @@
     private static class TopicImpl implements Topic {
 
         /**
-         * {@code True} if this topic is alive/running, {@code false} otherwise.
-         */
-        private boolean alive = false;
-
-        /**
          * 
          */
         public TopicImpl() {
@@ -917,32 +912,22 @@
 
         @Override
         public synchronized boolean start() {
-            if (alive) {
-                throw new IllegalStateException("topic already started");
-            }
-
-            alive = true;
             return true;
         }
 
         @Override
         public synchronized boolean stop() {
-            if (!alive) {
-                throw new IllegalStateException("topic is not running");
-            }
-
-            alive = false;
             return true;
         }
 
         @Override
         public synchronized void shutdown() {
-            alive = false;
+            // do nothing
         }
 
         @Override
         public synchronized boolean isAlive() {
-            return alive;
+            return true;
         }
 
         @Override
@@ -1081,12 +1066,12 @@
         }
 
         @Override
-        public List<TopicSource> initTopicSources(Properties props) {
+        public List<TopicSource> getTopicSources() {
             return Arrays.asList(new TopicSourceImpl(context, true));
         }
 
         @Override
-        public List<TopicSink> initTopicSinks(Properties props) {
+        public List<TopicSink> getTopicSinks() {
             return Arrays.asList(new TopicSinkImpl(context));
         }
     }
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
index f8f3755..32264e3 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingFeatureTest.java
@@ -271,11 +271,28 @@
         assertFalse(pool.afterStop(controller1));
         verify(mgr1).afterStop();
 
-        // ensure it has been removed from the map by re-invoking
-        assertFalse(pool.afterStop(controller1));
-
+        assertFalse(pool.afterStop(controllerDisabled));
+        
         // count should be unchanged
         verify(mgr1).afterStop();
+    }
+
+    @Test
+    public void testAfterHalt() {
+        assertFalse(pool.afterHalt(controller1));
+        assertFalse(pool.afterHalt(controller1));
+        
+        verify(mgr1, never()).afterStop();
+
+        assertFalse(pool.afterStop(controllerDisabled));
+    }
+
+    @Test
+    public void testAfterShutdown() {
+        assertFalse(pool.afterShutdown(controller1));
+        assertFalse(pool.afterShutdown(controller1));
+        
+        verify(mgr1, never()).afterStop();
 
         assertFalse(pool.afterStop(controllerDisabled));
     }
@@ -464,46 +481,6 @@
         pool.beforeStart(controller1);
     }
 
-    @Test
-    public void testDoDeleteManager() {
-        assertFalse(pool.afterStop(controller1));
-        verify(mgr1).afterStop();
-
-        // ensure it has been removed from the map by re-invoking
-        assertFalse(pool.afterStop(controller1));
-
-        // count should be unchanged
-        verify(mgr1).afterStop();
-
-
-        // different controller
-        assertFalse(pool.afterStop(controller2));
-        verify(mgr2).afterStop();
-
-        // ensure it has been removed from the map by re-invoking
-        assertFalse(pool.afterStop(controller2));
-
-        // count should be unchanged
-        verify(mgr2).afterStop();
-
-
-        assertFalse(pool.afterStop(controllerDisabled));
-    }
-
-    @Test
-    public void testDoDeleteManager_NotFound() {
-        assertFalse(pool.afterStop(controllerDisabled));
-    }
-
-    @Test(expected = PoolingFeatureRtException.class)
-    public void testDoDeleteManager_Ex() {
-
-        // generate exception
-        doThrow(new PoolingFeatureRtException()).when(mgr1).afterStop();
-
-        pool.afterStop(controller1);
-    }
-
     private Properties initProperties() {
         Properties props = new Properties();
 
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
index e0024b7..64573ab 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/PoolingManagerImplTest.java
@@ -155,7 +155,7 @@
 
         when(factory.makeEventQueue(any())).thenReturn(eventQueue);
         when(factory.makeClassExtractors(any())).thenReturn(extractors);
-        when(factory.makeDmaapManager(any(), any())).thenReturn(dmaap);
+        when(factory.makeDmaapManager(any())).thenReturn(dmaap);
         when(factory.makeScheduler()).thenReturn(sched);
         when(factory.canDecodeEvent(drools, TOPIC2)).thenReturn(true);
         when(factory.decodeEvent(drools, TOPIC2, THE_EVENT)).thenReturn(DECODED_EVENT);
@@ -188,7 +188,7 @@
 
     @Test
     public void testPoolingManagerImpl() throws Exception {
-        verify(factory).makeDmaapManager(any(), any());
+        verify(factory).makeDmaapManager(any());
 
         State st = mgr.getCurrent();
         assertTrue(st instanceof IdleState);
@@ -215,7 +215,7 @@
     public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
         // throw an exception when we try to create the dmaap manager
         PoolingFeatureException ex = new PoolingFeatureException();
-        when(factory.makeDmaapManager(any(), any())).thenThrow(ex);
+        when(factory.makeDmaapManager(any())).thenThrow(ex);
 
         PoolingFeatureRtException ex2 = expectException(PoolingFeatureRtException.class,
                         () -> new PoolingManagerImpl(MY_HOST, controller, poolProps, active));
@@ -272,19 +272,6 @@
     }
 
     @Test
-    public void testBeforeStart_DmaapEx() throws Exception {
-        // generate an exception
-        PoolingFeatureException ex = new PoolingFeatureException();
-        doThrow(ex).when(dmaap).startPublisher();
-
-        PoolingFeatureException ex2 = expectException(PoolingFeatureException.class, () -> mgr.beforeStart());
-        assertEquals(ex, ex2);
-
-        // should never start the scheduler
-        verify(factory, never()).makeScheduler();
-    }
-
-    @Test
     public void testAfterStart() throws Exception {
         startMgr();
 
@@ -317,7 +304,13 @@
     @Test
     public void testBeforeStop() throws Exception {
         startMgr();
+        mgr.startDistributing(makeAssignments(true));
 
+        // verify that this message is not queued
+        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
+        mgr.handle(msg);
+        verify(eventQueue, never()).add(msg);
+        
         mgr.beforeStop();
 
         verify(dmaap).stopConsumer(mgr);
@@ -325,6 +318,10 @@
         verify(dmaap).publish(contains("offline"));
 
         assertTrue(mgr.getCurrent() instanceof IdleState);
+
+        // verify that next message is queued
+        mgr.handle(msg);        
+        verify(eventQueue).add(msg);
     }
 
     @Test
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
index c56caca..bc92fa2 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/ForwardTest.java
@@ -177,11 +177,7 @@
         expectCheckValidityFailure(msg -> msg.setNumHops(-1));
     }
 
-    /**
-     * Makes a message that will pass the validity check.
-     * 
-     * @return a valid Message
-     */
+    @Override
     public Forward makeValidMessage() {
         tcreateMs = System.currentTimeMillis();
 
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
index da78dbe..43f1afd 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/HeartbeatTest.java
@@ -33,11 +33,7 @@
         super(Heartbeat.class);
     }
 
-    /**
-     * Makes a message that will pass the validity check.
-     * 
-     * @return a valid Message
-     */
+    @Override
     public Heartbeat makeValidMessage() {
         Heartbeat msg = new Heartbeat(VALID_HOST, ++sequence);
         msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
index 8255034..7b28afc 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/IdentificationTest.java
@@ -62,11 +62,7 @@
         msg.checkValidity();
     }
 
-    /**
-     * Makes a message that will pass the validity check.
-     * 
-     * @return a valid Message
-     */
+    @Override
     public Identification makeValidMessage() {
         Identification msg = new Identification(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
         msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
index 0f58e22..e30d7d0 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/LeaderTest.java
@@ -62,11 +62,7 @@
         expectCheckValidityFailure(msg -> msg.setAssignments(asgnNotSmallest));
     }
 
-    /**
-     * Makes a message that will pass the validity check.
-     * 
-     * @return a valid Message
-     */
+    @Override
     public Leader makeValidMessage() {
         Leader msg = new Leader(VALID_HOST, (isNullAssignments() ? null : VALID_ASGN));
         msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
index 8d0f4a6..4fe3736 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/OfflineTest.java
@@ -26,11 +26,7 @@
         super(Offline.class);
     }
 
-    /**
-     * Makes a message that will pass the validity check.
-     * 
-     * @return a valid Message
-     */
+    @Override
     public Offline makeValidMessage() {
         Offline msg = new Offline(VALID_HOST);
         msg.setChannel(VALID_CHANNEL);
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
index 0b2a986..e0ab016 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/message/QueryTest.java
@@ -26,11 +26,7 @@
         super(Query.class);
     }
 
-    /**
-     * Makes a message that will pass the validity check.
-     * 
-     * @return a valid Message
-     */
+    @Override
     public Query makeValidMessage() {
         Query msg = new Query(VALID_HOST);
         msg.setChannel(VALID_CHANNEL);