Publish VES event to MessageRouter via http

Change-Id: Ic5ed1fad1182e7343f213488c4015d2683ab8ddc
Issue-ID: DCAEGEN2-1273
Signed-off-by: emartin <ephraim.martin@est.tech>
diff --git a/pom.xml b/pom.xml
index ca6c426..31572a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
         <junit4.version>4.12</junit4.version>
         <jsonschema.version>1.3.0</jsonschema.version>
         <xerces.version>2.11.0</xerces.version>
+        <reactor.test>3.1.0.RELEASE</reactor.test>
         <!-- Plugin Versions -->
         <shade.plugin.version>3.2.0</shade.plugin.version>
         <jacoco.version>0.8.2</jacoco.version>
@@ -182,6 +183,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <version>${reactor.test}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.everit.json</groupId>
             <artifactId>org.everit.json.schema</artifactId>
             <version>${jsonschema.version}</version>
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 9abe086..03d42d5 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -38,6 +38,7 @@
 import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
 import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
 import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
 import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
@@ -73,6 +74,7 @@
         Mapper mapper = new Mapper(mappingTemplate);
         MeasSplitter splitter = new MeasSplitter(measConverter);
         XMLValidator validator = new XMLValidator(xmlSchema);
+        VESPublisher vesPublisher = new VESPublisher(mapperConfig);
 
         flux.onBackpressureDrop(App::handleBackPressure)
                 .doOnNext(App::receiveRequest)
@@ -86,6 +88,7 @@
                 .concatMap(event -> App.split(splitter,event, mapperConfig))
                 .filter(events -> App.filter(filterHandler, events, mapperConfig))
                 .concatMap(events -> App.map(mapper, events, mapperConfig))
+                .concatMap(vesPublisher::publish)
                 .subscribe(events -> logger.unwrap().info("Event Processed"));
 
         DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
index f37bcd3..19a4750 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -135,7 +135,7 @@
 
     private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
         JsonObject subscriberObj = new JsonObject();
-        subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+        subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
         subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
         subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
         subscriberObj.addProperty("lastMod", Instant.now().toString());
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
new file mode 100644
index 0000000..6b5c157
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
@@ -0,0 +1,28 @@
+/*-

+ * ============LICENSE_START=======================================================

+ *  Copyright (C) 2019 Nordix Foundation.

+ * ================================================================================

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ *

+ * SPDX-License-Identifier: Apache-2.0

+ * ============LICENSE_END=========================================================

+ */

+

+package org.onap.dcaegen2.services.pmmapper.exceptions;

+

+public class MRPublisherException extends RuntimeException{

+    public MRPublisherException(String message, Throwable cause) {

+        super(message, cause);

+    }

+

+}

diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
new file mode 100644
index 0000000..77b0545
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
@@ -0,0 +1,69 @@
+/*-

+ * ============LICENSE_START=======================================================

+ *  Copyright (C) 2019 Nordix Foundation.

+ * ================================================================================

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ *

+ * SPDX-License-Identifier: Apache-2.0

+ * ============LICENSE_END=========================================================

+ */

+

+package org.onap.dcaegen2.services.pmmapper.messagerouter;

+

+import java.util.List;

+

+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;

+import org.onap.dcaegen2.services.pmmapper.model.Event;

+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;

+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;

+import org.onap.logging.ref.slf4j.ONAPLogAdapter;

+import org.slf4j.LoggerFactory;

+

+import reactor.core.publisher.Flux;

+

+public class VESPublisher {

+    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));

+    private RequestSender sender;

+    private MapperConfig config;

+

+    public VESPublisher(MapperConfig config) {

+        this(config, new RequestSender());

+    }

+

+    public VESPublisher(MapperConfig config, RequestSender sender) {

+        this.sender = sender;

+        this.config = config;

+    }

+

+    public Flux<Event> publish(List<Event> events) {

+        logger.unwrap().info("Publishing VES events to messagerouter.");

+        Event event = events.get(0);

+        try {

+            events.forEach(e -> this.publish(e.getVes()));

+            logger.unwrap().info("Successfully published VES events to messagerouter.");

+        } catch(MRPublisherException e) {

+            logger.unwrap().error("Failed to publish VES event(s) to messagerouter. {}", e.getMessage());

+            return Flux.empty();

+        }

+        return Flux.just(event);

+    }

+

+    private void publish(String ves) {

+        try {

+            String topicUrl = config.getPublisherTopicUrl();

+            sender.send("POST", topicUrl, ves);

+        } catch (Exception e) {

+            throw new MRPublisherException(e.getMessage(), e);

+        }

+    }

+}

diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index d28d850..ffb09ba 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -82,6 +82,14 @@
         return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();

     }

 

+    public String getSubscriberDcaeLocation() {

+        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();

+    }

+

+    public String getPublisherTopicUrl() {

+        return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();

+    }

+

     public boolean dmaapInfoEquals(MapperConfig mapperConfig){

         return this

                 .getStreamsSubscribes()

diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
index 3380aca..658f820 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
@@ -20,10 +20,13 @@
 package org.onap.dcaegen2.services.pmmapper.utils;

 

 import java.io.BufferedReader;

+import java.io.IOException;

 import java.io.InputStream;

 import java.io.InputStreamReader;

+import java.io.OutputStream;

 import java.net.HttpURLConnection;

 import java.net.URL;

+import java.nio.charset.StandardCharsets;

 import java.util.UUID;

 import java.util.stream.Collectors;

 

@@ -39,41 +42,47 @@
     private static final int ERROR_START_RANGE = 300;

     private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));

     public static final String DELETE = "DELETE";

+    public static final String DEFAULT_CONTENT_TYPE = "text/plain";

+    public static final int DEFAULT_READ_TIMEOUT = 5000;

 

     /**

-     * Sends an Http GET request to a given endpoint.

-     *

-     * @return http response body

-     * @throws Exception

-     * @throws InterruptedException

+     * Works just like {@link RequestSender#send(method,urlString)}, except {@code method }

+     * is set to {@code GET} by default.

+     * @see RequestSender#send(String,String,String)

      */

-

     public String send(final String urlString) throws Exception {

         return send("GET", urlString);

     }

 

+    /**

+     * Works just like {@link RequestSender#send(method,urlString,body)}, except {@code body }

+     * is set to empty String by default.

+     * @see RequestSender#send(String,String,String)

+     */

+    public String send(String method, final String urlString) throws Exception {

+       return send(method,urlString,"");

+    }

 

     /**

-     * Sends a request to a given endpoint.

+     * Sends an http request to a given endpoint.

      * @param method of the outbound request

      * @param urlString representing given endpoint

+     * @param body of the request as json

      * @return http response body

      * @throws Exception

      */

-    public String send(String method, final String urlString) throws Exception {

+    public String send(String method, final String urlString, final String body) throws Exception {

         final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);

         final UUID requestID = UUID.randomUUID();

         String result = "";

 

         for (int i = 1; i <= MAX_RETRIES; i++) {

-            URL url = new URL(urlString);

-            HttpURLConnection connection = (HttpURLConnection) url.openConnection();

-            connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());

-            connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());

-            connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);

-            connection.setRequestMethod(method);

-            logger.unwrap()

-                    .info("Sending:\n{}", connection.getRequestProperties());

+            final URL url = new URL(urlString);

+            final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);

+            if(!body.isEmpty()) {

+                setMessageBody(connection, body);

+            }

+            logger.unwrap().info("Sending {} request to {}.", method, urlString);

 

             try (InputStream is = connection.getInputStream();

                     BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {

@@ -81,15 +90,12 @@
                         .collect(Collectors.joining("\n"));

                 int responseCode = connection.getResponseCode();

                 if (!(isWithinErrorRange(responseCode))) {

-                    logger.unwrap()

-                            .info("Received:\n{}", result);

+                    logger.unwrap().info("Server Response Received:\n{}", result);

                     break;

                 }

-

             } catch (Exception e) {

                 if (retryLimitReached(i)) {

-                    logger.unwrap()

-                            .error("Execution error: "+connection.getResponseMessage(), e);

+                    logger.unwrap().error("Execution error: "+connection.getResponseMessage(), e);

                     throw new Exception(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);

                 }

             }

@@ -99,6 +105,26 @@
         return result;

     }

 

+    private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws Exception {

+        HttpURLConnection connection = (HttpURLConnection) url.openConnection();

+        connection.setReadTimeout(DEFAULT_READ_TIMEOUT);

+        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());

+        connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());

+        connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);

+        connection.setRequestMethod(method);

+

+        return connection;

+    }

+

+    private void setMessageBody(HttpURLConnection connection, String body) throws IOException {

+        connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);

+        connection.setDoOutput(true);

+        OutputStream outputStream = connection.getOutputStream();

+        outputStream.write(body.getBytes(StandardCharsets.UTF_8));

+        outputStream.flush();

+        outputStream.close();

+    }

+

     private boolean retryLimitReached(final int retryCount) {

         return retryCount >= MAX_RETRIES;

     }

diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
new file mode 100644
index 0000000..69d34f8
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
@@ -0,0 +1,89 @@
+/*-

+ * ============LICENSE_START=======================================================

+ *  Copyright (C) 2019 Nordix Foundation.

+ * ================================================================================

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ *

+ * SPDX-License-Identifier: Apache-2.0

+ * ============LICENSE_END=========================================================

+ */

+package org.onap.dcaegen2.pmmapper.messagerouter;

+import static org.junit.jupiter.api.Assertions.assertThrows;

+import static org.mockito.Mockito.mock;

+import static org.mockito.Mockito.times;

+import static org.mockito.Mockito.verify;

+import static org.mockito.Mockito.when;

+import reactor.test.StepVerifier;

+import java.util.Arrays;

+import java.util.List;

+

+import org.junit.Before;

+import org.junit.Test;

+import org.junit.runner.RunWith;

+import org.mockito.Mockito;

+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;

+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;

+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;

+import org.onap.dcaegen2.services.pmmapper.model.Event;

+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;

+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;

+import org.powermock.core.classloader.annotations.PrepareForTest;

+import org.powermock.modules.junit4.PowerMockRunner;

+import reactor.core.publisher.Flux;

+

+@RunWith(PowerMockRunner.class)

+@PrepareForTest(EnvironmentConfig.class)

+public class VESPublisherTest {

+

+    private static String topicURL = "http://mr/topic";

+    private static RequestSender sender;

+    private static MapperConfig config;

+    private VESPublisher sut;

+    private String ves = "{}";

+

+    @Before

+    public void before() throws Exception {

+        config = mock(MapperConfig.class);

+        sender = mock(RequestSender.class);

+        sut = new VESPublisher(config, sender);

+        when(config.getPublisherTopicUrl()).thenReturn(topicURL);

+    }

+

+    @Test

+    public void publish_multiple_success() throws Exception {

+        Event event = mock(Event.class);

+        List<Event> events  = Arrays.asList(event,event,event);

+        when(event.getVes()).thenReturn(ves);

+

+        Flux<Event> flux = sut.publish(events);

+

+        verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString());

+        StepVerifier.create(flux)

+            .expectNextMatches(event::equals)

+            .expectComplete()

+            .verify();

+    }

+

+    @Test

+    public void publish_multiple_fail() throws Exception {

+        Event event = mock(Event.class);

+        List<Event> events  = Arrays.asList(event,event,event);

+        when(event.getVes()).thenReturn(ves);

+        when(sender.send("POST",topicURL,ves)).thenThrow(Exception.class);

+

+        Flux<Event> flux = sut.publish(events);

+

+        StepVerifier.create(flux)

+            .verifyComplete();

+    }

+}