Add DataRouter Subscriber
Change-Id: I7f81c3d7249dcfcf00e2c7f3272f478d2346397d
Issue-ID: DCAEGEN2-1079
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
diff --git a/lombok.config b/lombok.config
new file mode 100644
index 0000000..8f7e8aa
--- /dev/null
+++ b/lombok.config
@@ -0,0 +1 @@
+lombok.addLombokGeneratedAnnotation = true
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..266a8c0
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ Copyright (C) 2019 Nordix Foundation.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ SPDX-License-Identifier: Apache-2.0
+ ============LICENSE_END=========================================================
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.onap.dcaegen2.services</groupId>
+ <artifactId>pm-mapper</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <!-- Dependency Versions -->
+ <lombok.version>1.18.4</lombok.version>
+ <sl4j.version>1.7.25</sl4j.version>
+ <logback.version>1.2.3</logback.version>
+ <reactor.version>3.2.3.RELEASE</reactor.version>
+ <undertow.version>2.0.16.Final</undertow.version>
+ <gson.version>2.8.5</gson.version>
+ <!-- Testing.Test Dependencies -->
+ <junit.version>5.3.2</junit.version>
+ <mockito.version>2.23.4</mockito.version>
+ <mockito-ju5-ext.version>2.23.4</mockito-ju5-ext.version>
+ <powermock.version>2.0.0</powermock.version>
+ <junit4.version>4.12</junit4.version>
+ <!-- Plugin Versions -->
+ <shade.plugin.version>3.2.0</shade.plugin.version>
+ <jacoco.version>0.8.2</jacoco.version>
+ <!-- Plugin Settings -->
+ <surefire.version>2.22.0</surefire.version>
+ <compiler.target.version>1.8</compiler.target.version>
+ <compiler.source.version>1.8</compiler.source.version>
+ <shade.main>org.onap.dcaegen2.services.pmmapper.App</shade.main>
+ <shade.transformer>org.apache.maven.plugins.shade.resource.ManifestResourceTransformer</shade.transformer>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <version>${reactor.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.undertow</groupId>
+ <artifactId>undertow-core</artifactId>
+ <version>${undertow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${sl4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>${mockito-ju5-ext.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${surefire.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${surefire.version}</version>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${shade.plugin.version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <transformers>
+ <transformer implementation="${shade.transformer}">
+ <mainClass>${shade.main}</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>8</source>
+ <target>8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>${jacoco.version}</version>
+ <executions>
+ <execution>
+ <id>jacoco-instrument</id>
+ <goals>
+ <goal>instrument</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>jacoco-restore-instrumented-classes</id>
+ <goals>
+ <goal>restore-instrumented-classes</goal>
+ </goals>
+ </execution>
+ <execution>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>report</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <excludes>
+ <exclude>**/*App.*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
new file mode 100644
index 0000000..2b93d03
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper;
+
+import io.undertow.Handlers;
+import io.undertow.Undertow;
+import io.undertow.util.StatusCodes;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class App {
+
+ public static void main(String[] args) throws MalformedURLException, InterruptedException, TooManyTriesException {
+ DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> {
+ event.getHttpServerExchange().unDispatch();
+ event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING);
+ System.out.println(event.getMetadata().getProductName());
+ });
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(new URL("http://" + System.getenv("DMAAP_BC_SERVICE_HOST") + ":" + System.getenv("DMAAP_BC_SERVICE_PORT") + "/webapi/dr_subs"));
+ dataRouterSubscriber.start(config);
+
+ Undertow.builder()
+ .addHttpListener(8081, "0.0.0.0")
+ .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber))
+ .build().start();
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java
new file mode 100644
index 0000000..63b2a32
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.config;
+
+import lombok.Data;
+
+import java.net.URL;
+
+/**
+ * Stub for BusControllerConfiguration object.
+ */
+@Data
+public class BusControllerConfig {
+
+ private String dcaeLocation = "dcaeLocation";
+ private String deliveryURL = "deliveryURL";
+ private int feedId = 2;
+ private String lastMod = "lastMod";
+ private String username = "username";
+ private String password = "password";
+ private URL dataRouterSubscribeEndpoint;
+
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
new file mode 100644
index 0000000..1d27d3b
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -0,0 +1,177 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import io.undertow.util.HeaderValues;
+import lombok.Data;
+import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.Random;
+
+/**
+ * Subscriber for events sent from data router
+ * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
+ */
+@Slf4j
+@Data
+public class DataRouterSubscriber implements HttpHandler {
+
+ private static final int NUMBER_OF_ATTEMPTS = 5;
+ private static final int DEFAULT_TIMEOUT = 2000;
+ private static final int MAX_JITTER = 50;
+
+ private static final String METADATA_HEADER = "X-ATT-DR-META";
+ private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
+ private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
+
+ private boolean limited = false;
+ private Random jitterGenerator;
+ private Gson metadataBuilder;
+ @NonNull
+ private EventReceiver eventReceiver;
+
+ /**
+ * @param eventReceiver receiver for any inbound events.
+ */
+ public DataRouterSubscriber(EventReceiver eventReceiver) {
+ this.eventReceiver = eventReceiver;
+ this.jitterGenerator = new Random();
+ this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
+ .create();
+ }
+
+ /**
+ * Starts data flow by subscribing to data router through bus controller.
+ *
+ * @param config configuration object containing bus controller endpoint for subscription and
+ * all non constant configuration for subscription through this endpoint.
+ * @throws TooManyTriesException in the event that timeout has occurred several times.
+ */
+ public void start(BusControllerConfig config) throws TooManyTriesException, InterruptedException {
+ subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+ }
+
+ private HttpURLConnection getBusControllerConnection(BusControllerConfig config, int timeout) throws IOException {
+ HttpURLConnection connection = (HttpURLConnection) config.getDataRouterSubscribeEndpoint()
+ .openConnection();
+ connection.setRequestMethod("POST");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setDoOutput(true);
+ return connection;
+ }
+
+ private JsonObject getBusControllerSubscribeBody(BusControllerConfig config) {
+ JsonObject subscriberObj = new JsonObject();
+ subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+ subscriberObj.addProperty("deliveryURL", config.getDeliveryURL());
+ subscriberObj.addProperty("feedId", config.getFeedId());
+ subscriberObj.addProperty("lastMod", config.getLastMod());
+ subscriberObj.addProperty("username", config.getUsername());
+ subscriberObj.addProperty("userpwd", config.getPassword());
+ return subscriberObj;
+ }
+
+ private void subscribe(int attempts, int timeout, BusControllerConfig config) throws TooManyTriesException, InterruptedException {
+ int subResponse = 504;
+ String subMessage = "";
+ try {
+ HttpURLConnection connection = getBusControllerConnection(config, timeout);
+
+ try (OutputStream bodyStream = connection.getOutputStream();
+ OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
+ bodyWriter.write(getBusControllerSubscribeBody(config).toString());
+ }
+ subResponse = connection.getResponseCode();
+ subMessage = connection.getResponseMessage();
+ } catch (IOException e) {
+ log.info("Timeout Failure:", e);
+ }
+ log.info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
+ if (subResponse >= 300 && attempts > 1) {
+ Thread.sleep(timeout);
+ subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config);
+ } else if (subResponse >= 300) {
+ throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
+ }
+ }
+
+ /**
+ * Receives inbound requests, verifies that required headers are valid
+ * and passes an Event onto the eventReceiver.
+ * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
+ *
+ * @param httpServerExchange inbound http server exchange.
+ */
+ @Override
+ public void handleRequest(HttpServerExchange httpServerExchange) {
+ if (limited) {
+ httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE)
+ .getResponseSender()
+ .send(StatusCodes.SERVICE_UNAVAILABLE_STRING);
+ } else {
+ try {
+ String metadataAsString = Optional.of(httpServerExchange.getRequestHeaders()
+ .get(METADATA_HEADER))
+ .map((HeaderValues headerValues) -> headerValues.get(0))
+ .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
+
+ EventMetadata metadata = metadataBuilder.fromJson(metadataAsString, EventMetadata.class);
+ httpServerExchange.getRequestReceiver()
+ .receiveFullString((callbackExchange, body) -> {
+ httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata)));
+ });
+ } catch (NoMetadataException exception) {
+ log.info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
+ httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+ .getResponseSender()
+ .send(NO_METADATA_MESSAGE);
+ } catch (JsonParseException exception) {
+ log.info("Bad Request: Failure to parse metadata", exception);
+ httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+ .getResponseSender()
+ .send(BAD_METADATA_MESSAGE);
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java
new file mode 100644
index 0000000..77c8153
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+
+/**
+ * Sink for Events received from the data router subscriber.
+ */
+public interface EventReceiver {
+ void receive(Event event);
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java
new file mode 100644
index 0000000..280b9da
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class NoMetadataException extends Exception {
+ public NoMetadataException(String errorMessage) {
+ super(errorMessage);
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java
new file mode 100644
index 0000000..922239b
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java
@@ -0,0 +1,29 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+/**
+ * Exception indicates that a task has been attempted too many times.
+ */
+public class TooManyTriesException extends Exception {
+ public TooManyTriesException(String errorMessage){
+ super(errorMessage);
+ }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
new file mode 100644
index 0000000..a08dcfb
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.model;
+
+import io.undertow.server.HttpServerExchange;
+import lombok.Data;
+import lombok.NonNull;
+
+/**
+ * Class used to pass around relevant inbound event data.
+ */
+@Data
+public class Event {
+ @NonNull
+ private HttpServerExchange httpServerExchange;
+ @NonNull
+ private String body;
+ @NonNull
+ private EventMetadata metadata;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
new file mode 100644
index 0000000..601b00f
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.model;
+
+import lombok.Data;
+import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
+
+/**
+ * Metadata for inbound event onto data router subscriber.
+ */
+@Data
+public class EventMetadata {
+ @GSONRequired
+ private String productName;
+ @GSONRequired
+ private String vendorName;
+ @GSONRequired
+ private String startEpochMicrosec;
+ @GSONRequired
+ private String lastEpochMicrosec;
+ @GSONRequired
+ private String sourceName;
+ @GSONRequired
+ private String timeZoneOffset;
+ @GSONRequired
+ private String location;
+ @GSONRequired
+ private String compression;
+ @GSONRequired
+ private String fileFormatType;
+ @GSONRequired
+ private String fileFormatVersion;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java
new file mode 100644
index 0000000..a6ce7b9
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation used to make a field required for Gson.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface GSONRequired {
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
new file mode 100644
index 0000000..e956398
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+
+
+/**
+ * Extension of the default deserializer with support for GSONRequired annotation.
+ * @param <T> Type of object for deserialization process.
+ */
+public class RequiredFieldDeserializer<T> implements JsonDeserializer<T> {
+
+ @Override
+ public T deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
+ T obj = new Gson().fromJson(jsonElement, type);
+ for (Field field : obj.getClass().getDeclaredFields()) {
+ if (field.getAnnotation(GSONRequired.class) != null) {
+ field.setAccessible(true);
+ try {
+ if (field.get(obj) == null) {
+ throw new JsonParseException(String.format("Field: '%s', is required but not found.", field.getName()));
+ }
+ } catch (Exception exception) {
+ throw new JsonParseException("Failed to check fields.", exception);
+ }
+ }
+ }
+
+ return obj;
+ }
+
+}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
new file mode 100644
index 0000000..8f73c91
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
@@ -0,0 +1,225 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import io.undertow.io.Receiver;
+import io.undertow.io.Sender;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DataRouterSubscriber.class)
+public class DataRouterSubscriberTest {
+
+
+ @Mock
+ private EventReceiver eventReceiver;
+
+ private DataRouterSubscriber objUnderTest;
+
+ @Before
+ public void setUp() {
+ objUnderTest = new DataRouterSubscriber(eventReceiver);
+ }
+
+ @Test
+ public void testStartTooManyTriesWithResponse() throws IOException {
+ PowerMockito.mockStatic(Thread.class);
+
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ when(huc.getResponseCode()).thenReturn(300);
+ Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+ }
+
+ @Test
+ public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException {
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ when(huc.getResponseCode()).thenReturn(200);
+ objUnderTest.start(config);
+ verify(huc, times(1)).getResponseCode();
+ }
+
+ @Test
+ public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException {
+ PowerMockito.mockStatic(Thread.class);
+
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ doAnswer(new Answer() {
+ boolean forceRetry = true;
+
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (forceRetry) {
+ forceRetry = false;
+ throw new IOException();
+ }
+ return 200;
+ }
+ }).when(huc).getResponseCode();
+ objUnderTest.start(config);
+ verify(huc, times(2)).getResponseCode();
+ }
+
+ @Test
+ public void testStartReadTimeout() throws IOException {
+ PowerMockito.mockStatic(Thread.class);
+
+ URL subURL = mock(URL.class);
+ BusControllerConfig config = new BusControllerConfig();
+ config.setDataRouterSubscribeEndpoint(subURL);
+ HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+ when(subURL.openConnection()).thenReturn(huc);
+ doThrow(new IOException()).when(huc).getResponseCode();
+ Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+ }
+
+ @Test
+ public void testRequestInboundLimitedStateServiceUnavailable() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
+ Sender responseSender = mock(Sender.class);
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+ when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
+ objUnderTest.setLimited(true);
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(httpServerExchange).setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
+ }
+
+ @Test
+ public void testRequestInboundLimitedStateServiceNoEmission() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
+ Sender responseSender = mock(Sender.class);
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+ when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
+ objUnderTest.setLimited(true);
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(eventReceiver, times(0)).receive(any());
+ }
+
+
+
+ @Test
+ public void testRequestInboundInvalidMetadata() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/invalid_metadata.json")))).getAsJsonObject();
+ when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString());
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+ verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata.");
+
+ }
+
+ @Test
+ public void testRequestInboundNoMetadata() throws Exception{
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ Receiver receiver = mock(Receiver.class);
+ when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+ when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+ callback.handle(httpServerExchange, "");
+ return null;
+ }).when(receiver).receiveFullString(any());
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Runnable runnable = invocationOnMock.getArgument(0);
+ runnable.run();
+ return null;
+ }).when(httpServerExchange).dispatch(any(Runnable.class));
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+ verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata.");
+
+ }
+
+ @Test
+ public void testRequestInboundSuccess() throws Exception {
+ HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+ Receiver receiver = mock(Receiver.class);
+ when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+ String testString = "MESSAGE BODY";
+ JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject();
+ EventMetadata metadataObj = new GsonBuilder().create().fromJson(metadata, EventMetadata.class);
+
+ when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString());
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+ callback.handle(httpServerExchange, testString);
+ return null;
+ }).when(receiver).receiveFullString(any());
+
+ doAnswer((Answer<Void>) invocationOnMock -> {
+ Runnable runnable = invocationOnMock.getArgument(0);
+ runnable.run();
+ return null;
+ }).when(httpServerExchange).dispatch(any(Runnable.class));
+
+ objUnderTest.handleRequest(httpServerExchange);
+ verify(eventReceiver, times(1)).receive(new Event(httpServerExchange, testString, metadataObj));
+ }
+}
diff --git a/src/test/resources/invalid_metadata.json b/src/test/resources/invalid_metadata.json
new file mode 100644
index 0000000..31600b0
--- /dev/null
+++ b/src/test/resources/invalid_metadata.json
@@ -0,0 +1,11 @@
+{
+ "vendorName": "Ericsson",
+ "lastEpochMicrosec": "1538478000000",
+ "sourceName": "oteNB5309",
+ "startEpochMicrosec": "1538478900000",
+ "timeZoneOffset": "UTC+05.00",
+ "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+ "compression": "gzip",
+ "fileFormatType": "org.3GPP.32.435#measCollec",
+ "fileFormatVersion": "V9"
+}
\ No newline at end of file
diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..1f0955d
--- /dev/null
+++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
diff --git a/src/test/resources/valid_metadata.json b/src/test/resources/valid_metadata.json
new file mode 100644
index 0000000..21de3fb
--- /dev/null
+++ b/src/test/resources/valid_metadata.json
@@ -0,0 +1,12 @@
+{
+ "productName": "NrRadio",
+ "vendorName": "Ericsson",
+ "lastEpochMicrosec": "1538478000000",
+ "sourceName": "oteNB5309",
+ "startEpochMicrosec": "1538478900000",
+ "timeZoneOffset": "UTC+05.00",
+ "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+ "compression": "gzip",
+ "fileFormatType": "org.3GPP.32.435#measCollec",
+ "fileFormatVersion": "V9"
+}
\ No newline at end of file