Merge "PnfReadyEventConsumer implementation"
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
similarity index 78%
rename from bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
rename to bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
index 1fd2de9..830574b 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java
+++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
@@ -20,7 +20,6 @@
package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap;
-import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
@@ -38,7 +37,7 @@
import org.openecomp.mso.jsonpath.JsonPathUtil;
import org.openecomp.mso.logger.MsoLogger;
-public class PnfEventReadyConsumer implements DmaapClient {
+public class PnfEventReadyDmaapClient implements DmaapClient {
private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA);
@@ -57,7 +56,7 @@
private int dmaapClientDelayInSeconds;
private volatile boolean dmaapThreadListenerIsRunning;
- public PnfEventReadyConsumer() {
+ public PnfEventReadyDmaapClient() {
httpClient = HttpClientBuilder.create().build();
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
executor = null;
@@ -67,17 +66,6 @@
getRequest = new HttpGet(buildURI());
}
- //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting
- @VisibleForTesting
- void sendRequest() {
- try {
- HttpResponse response = httpClient.execute(getRequest);
- getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
- } catch (IOException e) {
- LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
- }
- }
-
@Override
public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) {
pnfCorrelationIdToThreadMap.put(correlationId, informConsumer);
@@ -98,7 +86,7 @@
private synchronized void startDmaapThreadListener() {
if (!dmaapThreadListenerIsRunning) {
executor = Executors.newScheduledThreadPool(1);
- executor.scheduleWithFixedDelay(this::sendRequest, 0,
+ executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0,
dmaapClientDelayInSeconds, TimeUnit.SECONDS);
dmaapThreadListenerIsRunning = true;
}
@@ -120,24 +108,6 @@
.path(consumerGroup).path(consumerId).build();
}
- private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
- if (response.getStatusLine().getStatusCode() == 200) {
- String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
- if (responseString != null) {
- return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
- }
- }
- return Optional.empty();
- }
-
-
- private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
- Runnable runnable = unregister(correlationId);
- if (runnable != null) {
- runnable.run();
- }
- }
-
public void setDmaapHost(String dmaapHost) {
this.dmaapHost = dmaapHost;
}
@@ -170,4 +140,34 @@
this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds;
}
+ class DmaapTopicListenerThread implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ HttpResponse response = httpClient.execute(getRequest);
+ getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound);
+ } catch (IOException e) {
+ LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e);
+ }
+ }
+
+ private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException {
+ if (response.getStatusLine().getStatusCode() == 200) {
+ String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");
+ if (responseString != null) {
+ return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) {
+ Runnable runnable = unregister(correlationId);
+ if (runnable != null) {
+ runnable.run();
+ }
+ }
+ }
+
}
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
index 4ddeba1..13ab4f8 100644
--- a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
+++ b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml
@@ -15,14 +15,14 @@
</bean>
<bean id="informDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.InformDmaapClient">
- <property name="dmaapClient" ref="pnfEventReadyConsumer"/>
+ <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>
</bean>
<bean id="cancelDmaapSubscription" class="org.openecomp.mso.bpmn.infrastructure.pnf.delegate.CancelDmaapSubscription">
- <property name="dmaapClient" ref="pnfEventReadyConsumer"/>
+ <property name="dmaapClient" ref="pnfEventReadyDmaapClient"/>
</bean>
- <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer"
+ <bean id="pnfEventReadyDmaapClient" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient"
init-method="init">
<property name="dmaapHost" value="${host}"/>
<property name="dmaapPort" value="${port}"/>
diff --git a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
similarity index 93%
rename from bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
rename to bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
index 73b8247..393730e 100644
--- a/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumerTest.java
+++ b/bpmn/MSOInfrastructureBPMN/src/test/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
@@ -43,8 +43,9 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
-public class PnfEventReadyConsumerTest {
+public class PnfEventReadyDmaapClientTest {
private static final String CORRELATION_ID = "corrTestId";
private static final String CORRELATION_ID_NOT_FOUND_IN_MAP = "otherCorrId";
@@ -61,14 +62,15 @@
private static final String CONSUMER_ID = "consumerTestId";
private static final String CONSUMER_GROUP = "consumerGroupTest";
- private PnfEventReadyConsumer testedObject;
+ private PnfEventReadyDmaapClient testedObject;
+ private DmaapTopicListenerThread testedObjectInnerClassThread;
private HttpClient httpClientMock;
private Runnable threadMockToNotifyCamundaFlow;
private ScheduledExecutorService executorMock;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
- testedObject = new PnfEventReadyConsumer();
+ testedObject = new PnfEventReadyDmaapClient();
testedObject.setDmaapHost(HOST);
testedObject.setDmaapPort(PORT);
testedObject.setDmaapProtocol(PROTOCOL);
@@ -78,6 +80,7 @@
testedObject.setConsumerGroup(CONSUMER_GROUP);
testedObject.setDmaapClientDelayInSeconds(1);
testedObject.init();
+ testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
httpClientMock = mock(HttpClient.class);
threadMockToNotifyCamundaFlow = mock(Runnable.class);
executorMock = mock(ScheduledExecutorService.class);
@@ -96,7 +99,7 @@
throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID)));
- testedObject.sendRequest();
+ testedObjectInnerClassThread.run();
ArgumentCaptor<HttpGet> captor1 = ArgumentCaptor.forClass(HttpGet.class);
verify(httpClientMock).execute(captor1.capture());
assertThat(captor1.getValue().getURI()).hasHost(HOST).hasPort(PORT).hasScheme(PROTOCOL)
@@ -119,7 +122,7 @@
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(
String.format(JSON_EXAMPLE_WITH_CORRELATION_ID, CORRELATION_ID_NOT_FOUND_IN_MAP)));
- testedObject.sendRequest();
+ testedObjectInnerClassThread.run();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}
@@ -133,7 +136,7 @@
public void correlationIdIsNotFoundInHttpResponse() throws IOException {
when(httpClientMock.execute(any(HttpGet.class))).
thenReturn(createResponse(JSON_EXAMPLE_WITH_NO_CORRELATION_ID));
- testedObject.sendRequest();
+ testedObjectInnerClassThread.run();
verifyZeroInteractions(threadMockToNotifyCamundaFlow, executorMock);
}