Remove sleep from DmaapMessageConsumer

Now that the timeout parameter is used when polling MR, the consumer
doesn't have to sleep between polls.

Also fixes the MR simulator so it responds correctly to the responses
from the policy agent.

Change-Id: I56d8be6a762013503defcd4f7297daa90a0a293e
Issue-ID: NONRTRIC-209
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/Application.java b/policy-agent/src/main/java/org/oransc/policyagent/Application.java
index 1c397fe..3bc7326 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/Application.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/Application.java
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent;
 
+import org.oransc.policyagent.dmaap.DmaapMessageConsumer;
 import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.CommandLineRunner;
@@ -34,19 +35,34 @@
     @Autowired
     private RefreshConfigTask configRefresh;
 
+    @Autowired
+    private DmaapMessageConsumer dmaapMessageConsumer;
+
     public static void main(String[] args) {
         SpringApplication.run(Application.class);
     }
 
     /**
-     * Starts the service and reads the configuration.
+     * Starts the configuration refresh task and reads the configuration.
      *
      * @param ctx the application context.
      *
-     * @return the command line runner performing tasks at startup.
+     * @return the command line runner for the configuration refresh task.
      */
     @Bean
-    public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
+    public CommandLineRunner configRefreshRunner(ApplicationContext ctx) {
         return args -> configRefresh.start();
     }
+
+    /**
+     * Starts the DMaaP message consumer service.
+     *
+     * @param ctx the application context.
+     *
+     * @return the command line runner for the DMaaP message consumer service.
+     */
+    @Bean
+    public CommandLineRunner dmaapMessageConsumerRunner(ApplicationContext ctx) {
+        return args -> dmaapMessageConsumer.start();
+    }
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
index da209a8..dd60d39 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
@@ -33,6 +33,7 @@
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -40,16 +41,22 @@
 import org.springframework.stereotype.Component;
 
 /**
- * The class fetches incoming requests from DMAAP on regular intervals. Each
- * received request is proceesed by DmaapMessageHandler.
+ * The class fetches incoming requests from DMAAP. It uses the timeout parameter that lets the MessageRouter keep the
+ * connection with the Kafka open until requests are sent in.
+ *
+ * If there is no DMaaP configuration in the application configuration, then this service will regularly check the
+ * configuration and start polling DMaaP if the configuration is added. If the DMaaP configuration is removed, then the
+ * service will stop polling and resume checking for configuration.
+ *
+ * Each received request is processed by {@link DmaapMessageHandler}.
  */
 @Component
-public class DmaapMessageConsumer implements Runnable {
+public class DmaapMessageConsumer {
+
+    protected static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
 
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
 
-    private static final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
-
     private final ApplicationConfig applicationConfig;
 
     @Value("${server.port}")
@@ -58,20 +65,26 @@
     @Autowired
     public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
         this.applicationConfig = applicationConfig;
+    }
 
-        Thread thread = new Thread(this);
+    public Thread start() {
+        Thread thread = new Thread(() -> this.checkConfigLoop());
         thread.start();
+        return thread;
     }
 
-    private boolean isDmaapConfigured() {
-        Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
-        Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
-        return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+    private void checkConfigLoop() {
+        while (!isStopped()) {
+            if (isDmaapConfigured()) {
+                messageHandlingLoop();
+            } else {
+                sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+            }
+        }
     }
 
-    @Override
-    public void run() {
-        while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) {
+    private void messageHandlingLoop() {
+        while (!isStopped() && isDmaapConfigured()) {
             try {
                 Iterable<String> dmaapMsgs = fetchAllMessages();
                 if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
@@ -81,13 +94,23 @@
                     }
                 }
             } catch (Exception e) {
-                logger.warn("{}: cannot fetch because of {}", this, e.getMessage());
-                sleep(TIME_BETWEEN_DMAAP_POLLS);
+                logger.warn("Cannot fetch because of {}", e.getMessage());
+                sleep(TIME_BETWEEN_DMAAP_RETRIES);
             }
         }
     }
 
-    private Iterable<String> fetchAllMessages() throws ServiceException, IOException {
+    protected boolean isStopped() {
+        return false;
+    }
+
+    protected boolean isDmaapConfigured() {
+        Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
+        Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
+        return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+    }
+
+    protected Iterable<String> fetchAllMessages() throws ServiceException, IOException {
         Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
         MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
         MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
@@ -118,29 +141,27 @@
         return createDmaapMessageHandler(agentClient, producer);
     }
 
-    boolean sleep(Duration duration) {
+    protected void sleep(Duration duration) {
         try {
             Thread.sleep(duration.toMillis());
-            return true;
         } catch (Exception e) {
             logger.error("Failed to put the thread to sleep", e);
-            return false;
         }
     }
 
-    MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
+    protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
         return MRClientFactory.createConsumer(dmaapConsumerProperties);
     }
 
-    DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
+    protected DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
         return new DmaapMessageHandler(producer, agentClient);
     }
 
-    AsyncRestClient createRestClient(String agentBaseUrl) {
+    protected AsyncRestClient createRestClient(String agentBaseUrl) {
         return new AsyncRestClient(agentBaseUrl);
     }
 
-    MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
+    protected MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
         return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
     }
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
index 05bcb0f..12bc55a 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
@@ -80,6 +80,11 @@
     @Value("#{systemEnvironment}")
     public Properties systemEnvironment;
 
+    /**
+     * The time between refreshes of the configuration.
+     */
+    public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+
     final ApplicationConfig appConfig;
     @Getter(AccessLevel.PROTECTED)
     private Disposable refreshTask = null;
@@ -90,8 +95,6 @@
     private final Policies policies;
     private final Services services;
     private final PolicyTypes policyTypes;
-    private static final Duration FILE_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
-    private static final Duration CONSUL_CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
 
     @Autowired
     public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
@@ -120,14 +123,14 @@
     }
 
     Flux<RicConfigUpdate.Type> createRefreshTask() {
-        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, FILE_CONFIG_REFRESH_INTERVAL) //
+        Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
             .filter(notUsed -> !this.isConsulUsed) //
             .flatMap(notUsed -> loadConfigurationFromFile()) //
             .onErrorResume(this::ignoreErrorFlux) //
             .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
             .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
 
-        Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONSUL_CONFIG_REFRESH_INTERVAL) //
+        Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
             .flatMap(i -> getEnvironment(systemEnvironment)) //
             .flatMap(this::createCbsClient) //
             .flatMap(this::getFromCbs) //
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
index 8dd3647..fc4e7ce 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
@@ -20,29 +20,33 @@
 
 package org.oransc.policyagent.dmaap;
 
+import static ch.qos.logback.classic.Level.WARN;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
 
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dmaap.mr.client.MRBatchingPublisher;
@@ -50,13 +54,12 @@
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.tasks.RefreshConfigTask;
 import org.oransc.policyagent.utils.LoggingUtils;
 import org.springframework.http.HttpStatus;
 
 @ExtendWith(MockitoExtension.class)
 public class DmaapMessageConsumerTest {
-    final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
-
     @Mock
     private ApplicationConfig applicationConfigMock;
     @Mock
@@ -66,98 +69,110 @@
 
     private DmaapMessageConsumer messageConsumerUnderTest;
 
+    @AfterEach
+    public void resetLogging() {
+        LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+    }
+
     @Test
-    public void dmaapNotConfigured_thenDoNothing() {
+    public void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true).when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
+        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
 
-        messageConsumerUnderTest.run();
+        messageConsumerUnderTest.start().join();
 
-        verify(messageConsumerUnderTest).sleep(TIME_BETWEEN_DMAAP_POLLS);
-        verify(applicationConfigMock).getDmaapConsumerConfig();
-        verify(applicationConfigMock).getDmaapPublisherConfig();
-        verifyNoMoreInteractions(applicationConfigMock);
+        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+    }
+
+    @Test
+    public void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
+        doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
+
+        messageConsumerUnderTest.start().join();
+
+        InOrder orderVerifier = inOrder(messageConsumerUnderTest);
+        orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+        orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
     }
 
     @Test
     public void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
+        setUpMrConfig();
+
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
-
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
-
         MRConsumerResponse response = new MRConsumerResponse();
         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
         response.setActualMessages(Collections.emptyList());
 
+        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
 
-        messageConsumerUnderTest.run();
-
-        verify(messageConsumerUnderTest, times(2)).sleep(TIME_BETWEEN_DMAAP_POLLS);
-
-        verify(applicationConfigMock, times(2)).getDmaapConsumerConfig();
-        verify(applicationConfigMock).getDmaapPublisherConfig();
-        verifyNoMoreInteractions(applicationConfigMock);
+        messageConsumerUnderTest.start().join();
 
         verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
         verifyNoMoreInteractions(messageRouterConsumerMock);
     }
 
     @Test
-    public void dmaapConfiguredAndErrorGettingMessages_thenLogWarning() throws Exception {
+    public void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
+        setUpMrConfig();
+
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
-
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
-
-        MRConsumerResponse response = new MRConsumerResponse();
-        response.setResponseCode(Integer.toString(HttpStatus.BAD_REQUEST.value()));
-        response.setResponseMessage("Error");
+        doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
+        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
         doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
             .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
 
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
+        MRConsumerResponse response = new MRConsumerResponse();
+        int responseCode = HttpStatus.BAD_REQUEST.value();
+        response.setResponseCode(Integer.toString(responseCode));
+        String responseMessage = "Error";
+        response.setResponseMessage(responseMessage);
+        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
 
-        messageConsumerUnderTest.run();
+        final ListAppender<ILoggingEvent> logAppender =
+            LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
 
-        assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
-        assertThat(
-            logAppender.list.toString().contains(": cannot fetch because of Error respons 400 Error from DMaaP."))
+        messageConsumerUnderTest.start().join();
+
+        assertThat(logAppender.list.toString()
+            .contains("Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."))
                 .isTrue();
+
+        verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
     }
 
     @Test
     public void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+        Properties properties = setUpMrConfig();
+
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(true, false).when(messageConsumerUnderTest).sleep(any(Duration.class));
-
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
+            .getMessageRouterConsumer(any(Properties.class));
 
         MRConsumerResponse response = new MRConsumerResponse();
         response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
-        List<String> messages = Arrays.asList("message");
+        String responseMessage = "message";
+        List<String> messages = Arrays.asList(responseMessage);
         response.setActualMessages(messages);
-
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
-        doReturn(response).when(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
 
         doReturn(messageHandlerMock).when(messageConsumerUnderTest)
             .createDmaapMessageHandler(any(AsyncRestClient.class), any(MRBatchingPublisher.class));
@@ -168,12 +183,20 @@
         MRBatchingPublisher publisherMock = mock(MRBatchingPublisher.class);
         doReturn(publisherMock).when(messageConsumerUnderTest).getMessageRouterPublisher(any(Properties.class));
 
-        messageConsumerUnderTest.run();
+        messageConsumerUnderTest.start().join();
 
         verify(messageConsumerUnderTest).createRestClient("https://localhost:0");
         verify(messageConsumerUnderTest).getMessageRouterPublisher(properties);
 
-        verify(messageHandlerMock).handleDmaapMsg("message");
+        verify(messageHandlerMock).handleDmaapMsg(responseMessage);
         verifyNoMoreInteractions(messageHandlerMock);
     }
+
+    private Properties setUpMrConfig() {
+        Properties properties = new Properties();
+        properties.put("key", "value");
+        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
+        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
+        return properties;
+    }
 }
diff --git a/test/mrstub/mr.py b/test/mrstub/mr.py
index 1094e7c..9c5a2c8 100644
--- a/test/mrstub/mr.py
+++ b/test/mrstub/mr.py
@@ -212,9 +212,9 @@
             print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
     except Exception as e:
         print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
-        return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
+        return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
 
-    return Response('OK', status=200, mimetype=MIME_TEXT)
+    return Response('{}', status=200, mimetype=MIME_JSON)
 
 
 ### Functions for metrics read out ###