Merge "PnfReadyEventConsumer implementation"
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
similarity index 78%
rename from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
rename to bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
index 1fd2de9..830574b 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
@@ -20,7 +20,6 @@
 
 package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Map;
@@ -38,7 +37,7 @@
 import org.openecomp.mso.jsonpath.JsonPathUtil;
 import org.openecomp.mso.logger.MsoLogger;
 
-public class PnfEventReadyConsumer implements DmaapClient {
+public class PnfEventReadyDmaapClient implements DmaapClient {
 
     private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
 
@@ -57,7 +56,7 @@
     private int dmaapClientDelayInSeconds;
     private volatile boolean dmaapThreadListenerIsRunning;
 
-    public PnfEventReadyConsumer() {
+    public PnfEventReadyDmaapClient() {
         httpClient = HttpClientBuilder.create().build();
         pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
         executor = null;
@@ -67,17 +66,6 @@
         getRequest = new HttpGet(buildURI());
     }
 
-    //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
-    @VisibleForTesting
-    void sendRequest() {
-        try {
-            HttpResponse response = httpClient.execute(getRequest);
-            getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
-        } catch (IOException e) {
-            LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
-        }
-    }
-
     @Override
     public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
         pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
@@ -98,7 +86,7 @@
     private synchronized void startDmaapThreadListener() {
         if (!dmaapThreadListenerIsRunning) {
             executor = Executors.newScheduledThreadPool(1);
-            executor.scheduleWithFixedDelay(this::sendRequest, 0,
+            executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
                     dmaapClientDelayInSeconds, TimeUnit.SECONDS);
             dmaapThreadListenerIsRunning = true;
         }
@@ -120,24 +108,6 @@
                 .path(consumerGroup).path(consumerId).build();
     }
 
-    private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
-        if (response.getStatusLine().getStatusCode() == 200) {
-            String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
-            if (responseString != null) {
-                return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
-            }
-        }
-        return Optional.empty();
-    }
-
-
-    private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
-        Runnable runnable = unregister(correlationId);
-        if (runnable != null) {
-            runnable.run();
-        }
-    }
-
     public void setDmaapHost(String dmaapHost) {
         this.dmaapHost = dmaapHost;
     }
@@ -170,4 +140,34 @@
         this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
     }
 
+    class DmaapTopicListenerThread implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                HttpResponse response = httpClient.execute(getRequest);
+                getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
+            } catch (IOException e) {
+                LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+            }
+        }
+
+        private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
+            if (response.getStatusLine().getStatusCode() == 200) {
+                String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+                if (responseString != null) {
+                    return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
+                }
+            }
+            return Optional.empty();
+        }
+
+        private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+            Runnable runnable = unregister(correlationId);
+            if (runnable != null) {
+                runnable.run();
+            }
+        }
+    }
+
 }
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
index 4ddeba1..13ab4f8 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
+++ b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
@@ -15,14 +15,14 @@
   </bean>

 

   <bean id="informDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.InformDmaapClient">

-    <property name="dmaapClient" ref="pnfEventReadyConsumer"/>

+    <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>

   </bean>

 

   <bean id="cancelDmaapSubscription" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.CancelDmaapSubscription">

-    <property name="dmaapClient" ref="pnfEventReadyConsumer"/>

+    <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>

   </bean>

 

-  <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer"

+  <bean id="pnfEventReadyDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient"

     init-method="init">

     <property name="dmaapHost" value="${host}"/>

     <property name="dmaapPort" value="${port}"/>

diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
similarity index 93%
rename from bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
rename to bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
index 73b8247..393730e 100644
--- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
+++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
@@ -43,8 +43,9 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
 
-public class PnfEventReadyConsumerTest {
+public class PnfEventReadyDmaapClientTest {
 
     private static final String CORRELATION_ID = "corrTestId";
     private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
@@ -61,14 +62,15 @@
     private static final String CONSUMER_ID = "consumerTestId";
     private static final String CONSUMER_GROUP = "consumerGroupTest";
 
-    private PnfEventReadyConsumer testedObject;
+    private PnfEventReadyDmaapClient testedObject;
+    private DmaapTopicListenerThread testedObjectInnerClassThread;
     private HttpClient httpClientMock;
     private Runnable threadMockToNotifyCamundaFlow;
     private ScheduledExecutorService executorMock;
 
     @Before
     public void init() throws NoSuchFieldException, IllegalAccessException {
-        testedObject = new PnfEventReadyConsumer();
+        testedObject = new PnfEventReadyDmaapClient();
         testedObject.setDmaapHost(HOST);
         testedObject.setDmaapPort(PORT);
         testedObject.setDmaapProtocol(PROTOCOL);
@@ -78,6 +80,7 @@
         testedObject.setConsumerGroup(CONSUMER_GROUP);
         testedObject.setDmaapClientDelayInSeconds(1);
         testedObject.init();
+        testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
         httpClientMock = mock(HttpClient.class);
         threadMockToNotifyCamundaFlow = mock(Runnable.class);
         executorMock = mock(ScheduledExecutorService.class);
@@ -96,7 +99,7 @@
             throws IOException {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
-        testedObject.sendRequest();
+        testedObjectInnerClassThread.run();
         ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
         verify(httpClientMock).execute(captor1.capture());
         assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
@@ -119,7 +122,7 @@
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(
                         String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
-        testedObject.sendRequest();
+        testedObjectInnerClassThread.run();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }
 
@@ -133,7 +136,7 @@
     public void correlationIdIsNotFoundInHttpResponse() throws IOException {
         when(httpClientMock.execute(any(HttpGet.class))).
                 thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
-        testedObject.sendRequest();
+        testedObjectInnerClassThread.run();
         verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
     }