Publish VES event to MessageRouter via http
Change-Id: Ic5ed1fad1182e7343f213488c4015d2683ab8ddc
Issue-ID: DCAEGEN2-1273
Signed-off-by: emartin <ephraim.martin@est.tech>
diff --git a/pom.xml b/pom.xml
index ca6c426..31572a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
<junit4.version>4.12</junit4.version>
<jsonschema.version>1.3.0</jsonschema.version>
<xerces.version>2.11.0</xerces.version>
+ <reactor.test>3.1.0.RELEASE</reactor.test>
<!-- Plugin Versions -->
<shade.plugin.version>3.2.0</shade.plugin.version>
<jacoco.version>0.8.2</jacoco.version>
@@ -182,6 +183,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <version>${reactor.test}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.everit.json</groupId>
<artifactId>org.everit.json.schema</artifactId>
<version>${jsonschema.version}</version>
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 9abe086..03d42d5 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -38,6 +38,7 @@
import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
import org.onap.dcaegen2.services.pmmapper.model.Event;
import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
import org.onap.dcaegen2.services.pmmapper.healthcheck.HealthCheckHandler;
@@ -73,6 +74,7 @@
Mapper mapper = new Mapper(mappingTemplate);
MeasSplitter splitter = new MeasSplitter(measConverter);
XMLValidator validator = new XMLValidator(xmlSchema);
+ VESPublisher vesPublisher = new VESPublisher(mapperConfig);
flux.onBackpressureDrop(App::handleBackPressure)
.doOnNext(App::receiveRequest)
@@ -86,6 +88,7 @@
.concatMap(event -> App.split(splitter,event, mapperConfig))
.filter(events -> App.filter(filterHandler, events, mapperConfig))
.concatMap(events -> App.map(mapper, events, mapperConfig))
+ .concatMap(vesPublisher::publish)
.subscribe(events -> logger.unwrap().info("Event Processed"));
DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
index f37bcd3..19a4750 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -135,7 +135,7 @@
private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
JsonObject subscriberObj = new JsonObject();
- subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+ subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
subscriberObj.addProperty("lastMod", Instant.now().toString());
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
new file mode 100644
index 0000000..6b5c157
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/MRPublisherException.java
@@ -0,0 +1,28 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class MRPublisherException extends RuntimeException{
+ public MRPublisherException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
new file mode 100644
index 0000000..77b0545
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/messagerouter/VESPublisher.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.messagerouter;
+
+import java.util.List;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+
+public class VESPublisher {
+ private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(VESPublisher.class));
+ private RequestSender sender;
+ private MapperConfig config;
+
+ public VESPublisher(MapperConfig config) {
+ this(config, new RequestSender());
+ }
+
+ public VESPublisher(MapperConfig config, RequestSender sender) {
+ this.sender = sender;
+ this.config = config;
+ }
+
+ public Flux<Event> publish(List<Event> events) {
+ logger.unwrap().info("Publishing VES events to messagerouter.");
+ Event event = events.get(0);
+ try {
+ events.forEach(e -> this.publish(e.getVes()));
+ logger.unwrap().info("Successfully published VES events to messagerouter.");
+ } catch(MRPublisherException e) {
+ logger.unwrap().error("Failed to publish VES event(s) to messagerouter. {}", e.getMessage());
+ return Flux.empty();
+ }
+ return Flux.just(event);
+ }
+
+ private void publish(String ves) {
+ try {
+ String topicUrl = config.getPublisherTopicUrl();
+ sender.send("POST", topicUrl, ves);
+ } catch (Exception e) {
+ throw new MRPublisherException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index d28d850..ffb09ba 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -82,6 +82,14 @@
return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
}
+ public String getSubscriberDcaeLocation() {
+ return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
+ }
+
+ public String getPublisherTopicUrl() {
+ return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();
+ }
+
public boolean dmaapInfoEquals(MapperConfig mapperConfig){
return this
.getStreamsSubscribes()
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
index 3380aca..658f820 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequestSender.java
@@ -20,10 +20,13 @@
package org.onap.dcaegen2.services.pmmapper.utils;
import java.io.BufferedReader;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -39,41 +42,47 @@
private static final int ERROR_START_RANGE = 300;
private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(RequestSender.class));
public static final String DELETE = "DELETE";
+ public static final String DEFAULT_CONTENT_TYPE = "text/plain";
+ public static final int DEFAULT_READ_TIMEOUT = 5000;
/**
- * Sends an Http GET request to a given endpoint.
- *
- * @return http response body
- * @throws Exception
- * @throws InterruptedException
+ * Works just like {@link RequestSender#send(method,urlString)}, except {@code method }
+ * is set to {@code GET} by default.
+ * @see RequestSender#send(String,String,String)
*/
-
public String send(final String urlString) throws Exception {
return send("GET", urlString);
}
+ /**
+ * Works just like {@link RequestSender#send(method,urlString,body)}, except {@code body }
+ * is set to empty String by default.
+ * @see RequestSender#send(String,String,String)
+ */
+ public String send(String method, final String urlString) throws Exception {
+ return send(method,urlString,"");
+ }
/**
- * Sends a request to a given endpoint.
+ * Sends an http request to a given endpoint.
* @param method of the outbound request
* @param urlString representing given endpoint
+ * @param body of the request as json
* @return http response body
* @throws Exception
*/
- public String send(String method, final String urlString) throws Exception {
+ public String send(String method, final String urlString, final String body) throws Exception {
final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
final UUID requestID = UUID.randomUUID();
String result = "";
for (int i = 1; i <= MAX_RETRIES; i++) {
- URL url = new URL(urlString);
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
- connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
- connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
- connection.setRequestMethod(method);
- logger.unwrap()
- .info("Sending:\n{}", connection.getRequestProperties());
+ final URL url = new URL(urlString);
+ final HttpURLConnection connection = getHttpURLConnection(method, url, invocationID, requestID);
+ if(!body.isEmpty()) {
+ setMessageBody(connection, body);
+ }
+ logger.unwrap().info("Sending {} request to {}.", method, urlString);
try (InputStream is = connection.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
@@ -81,15 +90,12 @@
.collect(Collectors.joining("\n"));
int responseCode = connection.getResponseCode();
if (!(isWithinErrorRange(responseCode))) {
- logger.unwrap()
- .info("Received:\n{}", result);
+ logger.unwrap().info("Server Response Received:\n{}", result);
break;
}
-
} catch (Exception e) {
if (retryLimitReached(i)) {
- logger.unwrap()
- .error("Execution error: "+connection.getResponseMessage(), e);
+ logger.unwrap().error("Execution error: "+connection.getResponseMessage(), e);
throw new Exception(SERVER_ERROR_MESSAGE + ": " + connection.getResponseMessage(), e);
}
}
@@ -99,6 +105,26 @@
return result;
}
+ private HttpURLConnection getHttpURLConnection(String method, URL url, UUID invocationID, UUID requestID) throws Exception {
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setReadTimeout(DEFAULT_READ_TIMEOUT);
+ connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
+ connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
+ connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
+ connection.setRequestMethod(method);
+
+ return connection;
+ }
+
+ private void setMessageBody(HttpURLConnection connection, String body) throws IOException {
+ connection.setRequestProperty("Content-Type",DEFAULT_CONTENT_TYPE);
+ connection.setDoOutput(true);
+ OutputStream outputStream = connection.getOutputStream();
+ outputStream.write(body.getBytes(StandardCharsets.UTF_8));
+ outputStream.flush();
+ outputStream.close();
+ }
+
private boolean retryLimitReached(final int retryCount) {
return retryCount >= MAX_RETRIES;
}
diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
new file mode 100644
index 0000000..69d34f8
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/pmmapper/messagerouter/VESPublisherTest.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.pmmapper.messagerouter;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import reactor.test.StepVerifier;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.onap.dcaegen2.services.pmmapper.exceptions.MRPublisherException;
+import org.onap.dcaegen2.services.pmmapper.messagerouter.VESPublisher;
+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
+import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import reactor.core.publisher.Flux;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(EnvironmentConfig.class)
+public class VESPublisherTest {
+
+ private static String topicURL = "http://mr/topic";
+ private static RequestSender sender;
+ private static MapperConfig config;
+ private VESPublisher sut;
+ private String ves = "{}";
+
+ @Before
+ public void before() throws Exception {
+ config = mock(MapperConfig.class);
+ sender = mock(RequestSender.class);
+ sut = new VESPublisher(config, sender);
+ when(config.getPublisherTopicUrl()).thenReturn(topicURL);
+ }
+
+ @Test
+ public void publish_multiple_success() throws Exception {
+ Event event = mock(Event.class);
+ List<Event> events = Arrays.asList(event,event,event);
+ when(event.getVes()).thenReturn(ves);
+
+ Flux<Event> flux = sut.publish(events);
+
+ verify(sender, times(3)).send(Mockito.anyString(),Mockito.anyString(), Mockito.anyString());
+ StepVerifier.create(flux)
+ .expectNextMatches(event::equals)
+ .expectComplete()
+ .verify();
+ }
+
+ @Test
+ public void publish_multiple_fail() throws Exception {
+ Event event = mock(Event.class);
+ List<Event> events = Arrays.asList(event,event,event);
+ when(event.getVes()).thenReturn(ves);
+ when(sender.send("POST",topicURL,ves)).thenThrow(Exception.class);
+
+ Flux<Event> flux = sut.publish(events);
+
+ StepVerifier.create(flux)
+ .verifyComplete();
+ }
+}