Add DataRouter Subscriber

Change-Id: I7f81c3d7249dcfcf00e2c7f3272f478d2346397d
Issue-ID: DCAEGEN2-1079
Signed-off-by: JoeOLeary <joseph.o.leary@est.tech>
diff --git a/lombok.config b/lombok.config
new file mode 100644
index 0000000..8f7e8aa
--- /dev/null
+++ b/lombok.config
@@ -0,0 +1 @@
+lombok.addLombokGeneratedAnnotation = true
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..266a8c0
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ============LICENSE_START=======================================================
+   Copyright (C) 2019 Nordix Foundation.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+  SPDX-License-Identifier: Apache-2.0
+  ============LICENSE_END=========================================================
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.onap.dcaegen2.services</groupId>
+    <artifactId>pm-mapper</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <!-- Dependency Versions -->
+        <lombok.version>1.18.4</lombok.version>
+        <sl4j.version>1.7.25</sl4j.version>
+        <logback.version>1.2.3</logback.version>
+        <reactor.version>3.2.3.RELEASE</reactor.version>
+        <undertow.version>2.0.16.Final</undertow.version>
+        <gson.version>2.8.5</gson.version>
+        <!-- Testing.Test Dependencies -->
+        <junit.version>5.3.2</junit.version>
+        <mockito.version>2.23.4</mockito.version>
+        <mockito-ju5-ext.version>2.23.4</mockito-ju5-ext.version>
+        <powermock.version>2.0.0</powermock.version>
+        <junit4.version>4.12</junit4.version>
+        <!-- Plugin Versions -->
+        <shade.plugin.version>3.2.0</shade.plugin.version>
+        <jacoco.version>0.8.2</jacoco.version>
+        <!-- Plugin Settings -->
+        <surefire.version>2.22.0</surefire.version>
+        <compiler.target.version>1.8</compiler.target.version>
+        <compiler.source.version>1.8</compiler.source.version>
+        <shade.main>org.onap.dcaegen2.services.pmmapper.App</shade.main>
+        <shade.transformer>org.apache.maven.plugins.shade.resource.ManifestResourceTransformer</shade.transformer>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>${lombok.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <version>${reactor.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.undertow</groupId>
+            <artifactId>undertow-core</artifactId>
+            <version>${undertow.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${sl4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <version>${mockito-ju5-ext.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${surefire.version}</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>${surefire.version}</version>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${shade.plugin.version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <transformers>
+                                <transformer implementation="${shade.transformer}">
+                                    <mainClass>${shade.main}</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+                <version>${jacoco.version}</version>
+                <executions>
+                    <execution>
+                        <id>jacoco-instrument</id>
+                        <goals>
+                            <goal>instrument</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>jacoco-restore-instrumented-classes</id>
+                        <goals>
+                            <goal>restore-instrumented-classes</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <goals>
+                            <goal>prepare-agent</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>report</id>
+                        <phase>prepare-package</phase>
+                        <goals>
+                            <goal>report</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <excludes>
+                        <exclude>**/*App.*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
new file mode 100644
index 0000000..2b93d03
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper;
+
+import io.undertow.Handlers;
+import io.undertow.Undertow;
+import io.undertow.util.StatusCodes;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+
+public class App {
+
+    public static void main(String[] args) throws MalformedURLException, InterruptedException, TooManyTriesException {
+        DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(event -> {
+            event.getHttpServerExchange().unDispatch();
+            event.getHttpServerExchange().getResponseSender().send(StatusCodes.OK_STRING);
+            System.out.println(event.getMetadata().getProductName());
+        });
+        BusControllerConfig config = new BusControllerConfig();
+        config.setDataRouterSubscribeEndpoint(new URL("http://" + System.getenv("DMAAP_BC_SERVICE_HOST") + ":" + System.getenv("DMAAP_BC_SERVICE_PORT") + "/webapi/dr_subs"));
+        dataRouterSubscriber.start(config);
+
+        Undertow.builder()
+                .addHttpListener(8081, "0.0.0.0")
+                .setHandler(Handlers.routing().add("put", "/sub", dataRouterSubscriber))
+                .build().start();
+    }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java
new file mode 100644
index 0000000..63b2a32
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/BusControllerConfig.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.config;
+
+import lombok.Data;
+
+import java.net.URL;
+
+/**
+ * Stub for BusControllerConfiguration object.
+ */
+@Data
+public class BusControllerConfig {
+
+    private String dcaeLocation = "dcaeLocation";
+    private String deliveryURL = "deliveryURL";
+    private int feedId = 2;
+    private String lastMod = "lastMod";
+    private String username = "username";
+    private String password = "password";
+    private URL dataRouterSubscribeEndpoint;
+
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
new file mode 100644
index 0000000..1d27d3b
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
@@ -0,0 +1,177 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import io.undertow.util.HeaderValues;
+import lombok.Data;
+import lombok.NonNull;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.Random;
+
+/**
+ * Subscriber for events sent from data router
+ * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
+ */
+@Slf4j
+@Data
+public class DataRouterSubscriber implements HttpHandler {
+
+    private static final int NUMBER_OF_ATTEMPTS = 5;
+    private static final int DEFAULT_TIMEOUT = 2000;
+    private static final int MAX_JITTER = 50;
+
+    private static final String METADATA_HEADER = "X-ATT-DR-META";
+    private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
+    private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
+
+    private boolean limited = false;
+    private Random jitterGenerator;
+    private Gson metadataBuilder;
+    @NonNull
+    private EventReceiver eventReceiver;
+
+    /**
+     * @param eventReceiver receiver for any inbound events.
+     */
+    public DataRouterSubscriber(EventReceiver eventReceiver) {
+        this.eventReceiver = eventReceiver;
+        this.jitterGenerator = new Random();
+        this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
+                .create();
+    }
+
+    /**
+     * Starts data flow by subscribing to data router through bus controller.
+     *
+     * @param config configuration object containing bus controller endpoint for subscription and
+     *               all non constant configuration for subscription through this endpoint.
+     * @throws TooManyTriesException in the event that timeout has occurred several times.
+     */
+    public void start(BusControllerConfig config) throws TooManyTriesException, InterruptedException {
+        subscribe(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, config);
+    }
+
+    private HttpURLConnection getBusControllerConnection(BusControllerConfig config, int timeout) throws IOException {
+        HttpURLConnection connection = (HttpURLConnection) config.getDataRouterSubscribeEndpoint()
+                .openConnection();
+        connection.setRequestMethod("POST");
+        connection.setConnectTimeout(timeout);
+        connection.setReadTimeout(timeout);
+        connection.setRequestProperty("Content-Type", "application/json");
+        connection.setDoOutput(true);
+        return connection;
+    }
+
+    private JsonObject getBusControllerSubscribeBody(BusControllerConfig config) {
+        JsonObject subscriberObj = new JsonObject();
+        subscriberObj.addProperty("dcaeLocationName", config.getDcaeLocation());
+        subscriberObj.addProperty("deliveryURL", config.getDeliveryURL());
+        subscriberObj.addProperty("feedId", config.getFeedId());
+        subscriberObj.addProperty("lastMod", config.getLastMod());
+        subscriberObj.addProperty("username", config.getUsername());
+        subscriberObj.addProperty("userpwd", config.getPassword());
+        return subscriberObj;
+    }
+
+    private void subscribe(int attempts, int timeout, BusControllerConfig config) throws TooManyTriesException, InterruptedException {
+        int subResponse = 504;
+        String subMessage = "";
+        try {
+            HttpURLConnection connection = getBusControllerConnection(config, timeout);
+
+            try (OutputStream bodyStream = connection.getOutputStream();
+                 OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
+                bodyWriter.write(getBusControllerSubscribeBody(config).toString());
+            }
+            subResponse = connection.getResponseCode();
+            subMessage = connection.getResponseMessage();
+        } catch (IOException e) {
+            log.info("Timeout Failure:", e);
+        }
+        log.info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
+        if (subResponse >= 300 && attempts > 1) {
+            Thread.sleep(timeout);
+            subscribe(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), config);
+        } else if (subResponse >= 300) {
+            throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
+        }
+    }
+
+    /**
+     * Receives inbound requests, verifies that required headers are valid
+     * and passes an Event onto the eventReceiver.
+     * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
+     *
+     * @param httpServerExchange inbound http server exchange.
+     */
+    @Override
+    public void handleRequest(HttpServerExchange httpServerExchange) {
+        if (limited) {
+            httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE)
+                    .getResponseSender()
+                    .send(StatusCodes.SERVICE_UNAVAILABLE_STRING);
+        } else {
+            try {
+                String metadataAsString = Optional.of(httpServerExchange.getRequestHeaders()
+                        .get(METADATA_HEADER))
+                        .map((HeaderValues headerValues) -> headerValues.get(0))
+                        .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
+
+                EventMetadata metadata = metadataBuilder.fromJson(metadataAsString, EventMetadata.class);
+                httpServerExchange.getRequestReceiver()
+                        .receiveFullString((callbackExchange, body) -> {
+                            httpServerExchange.dispatch(() -> eventReceiver.receive(new Event(callbackExchange, body, metadata)));
+                        });
+            } catch (NoMetadataException exception) {
+                log.info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
+                httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+                        .getResponseSender()
+                        .send(NO_METADATA_MESSAGE);
+            } catch (JsonParseException exception) {
+                log.info("Bad Request: Failure to parse metadata", exception);
+                httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+                        .getResponseSender()
+                        .send(BAD_METADATA_MESSAGE);
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java
new file mode 100644
index 0000000..77c8153
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/EventReceiver.java
@@ -0,0 +1,31 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+
+/**
+ * Sink for Events received from the data router subscriber.
+ */
+public interface EventReceiver {
+    void receive(Event event);
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java
new file mode 100644
index 0000000..280b9da
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/NoMetadataException.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+public class NoMetadataException extends Exception {
+    public NoMetadataException(String errorMessage) {
+        super(errorMessage);
+    }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java
new file mode 100644
index 0000000..922239b
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/exceptions/TooManyTriesException.java
@@ -0,0 +1,29 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.exceptions;
+
+/**
+ * Exception indicates that a task has been attempted too many times.
+ */
+public class TooManyTriesException extends Exception {
+    public TooManyTriesException(String errorMessage){
+        super(errorMessage);
+    }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
new file mode 100644
index 0000000..a08dcfb
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/Event.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.model;
+
+import io.undertow.server.HttpServerExchange;
+import lombok.Data;
+import lombok.NonNull;
+
+/**
+ * Class used to pass around relevant inbound event data.
+ */
+@Data
+public class Event {
+    @NonNull
+    private HttpServerExchange httpServerExchange;
+    @NonNull
+    private String body;
+    @NonNull
+    private EventMetadata metadata;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
new file mode 100644
index 0000000..601b00f
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
@@ -0,0 +1,50 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.model;
+
+import lombok.Data;
+import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
+
+/**
+ * Metadata for inbound event onto data router subscriber.
+ */
+@Data
+public class EventMetadata {
+    @GSONRequired
+    private String productName;
+    @GSONRequired
+    private String vendorName;
+    @GSONRequired
+    private String startEpochMicrosec;
+    @GSONRequired
+    private String lastEpochMicrosec;
+    @GSONRequired
+    private String sourceName;
+    @GSONRequired
+    private String timeZoneOffset;
+    @GSONRequired
+    private String location;
+    @GSONRequired
+    private String compression;
+    @GSONRequired
+    private String fileFormatType;
+    @GSONRequired
+    private String fileFormatVersion;
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java
new file mode 100644
index 0000000..a6ce7b9
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/GSONRequired.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation used to make a field required for Gson.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface GSONRequired {
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
new file mode 100644
index 0000000..e956398
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
@@ -0,0 +1,57 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.utils;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Type;
+
+
+/**
+ * Extension of the default deserializer with support for GSONRequired annotation.
+ * @param <T> Type of object for deserialization process.
+ */
+public class RequiredFieldDeserializer<T> implements JsonDeserializer<T> {
+
+    @Override
+    public T deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
+        T obj = new Gson().fromJson(jsonElement, type);
+        for (Field field : obj.getClass().getDeclaredFields()) {
+            if (field.getAnnotation(GSONRequired.class) != null) {
+                field.setAccessible(true);
+                try {
+                    if (field.get(obj) == null) {
+                        throw new JsonParseException(String.format("Field: '%s', is required but not found.", field.getName()));
+                    }
+                } catch (Exception exception) {
+                    throw new JsonParseException("Failed to check fields.", exception);
+                }
+            }
+        }
+
+        return obj;
+    }
+
+}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
new file mode 100644
index 0000000..8f73c91
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
@@ -0,0 +1,225 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.onap.dcaegen2.services.pmmapper.config.BusControllerConfig;
+import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import io.undertow.io.Receiver;
+import io.undertow.io.Sender;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DataRouterSubscriber.class)
+public class DataRouterSubscriberTest {
+
+
+    @Mock
+    private EventReceiver eventReceiver;
+
+    private DataRouterSubscriber objUnderTest;
+
+    @Before
+    public void setUp() {
+        objUnderTest = new DataRouterSubscriber(eventReceiver);
+    }
+
+    @Test
+    public void testStartTooManyTriesWithResponse() throws IOException {
+        PowerMockito.mockStatic(Thread.class);
+
+        URL subURL = mock(URL.class);
+        BusControllerConfig config = new BusControllerConfig();
+        config.setDataRouterSubscribeEndpoint(subURL);
+        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+        when(subURL.openConnection()).thenReturn(huc);
+        when(huc.getResponseCode()).thenReturn(300);
+        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+    }
+
+    @Test
+    public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException {
+        URL subURL = mock(URL.class);
+        BusControllerConfig config = new BusControllerConfig();
+        config.setDataRouterSubscribeEndpoint(subURL);
+        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+        when(subURL.openConnection()).thenReturn(huc);
+        when(huc.getResponseCode()).thenReturn(200);
+        objUnderTest.start(config);
+        verify(huc, times(1)).getResponseCode();
+    }
+
+    @Test
+    public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException {
+        PowerMockito.mockStatic(Thread.class);
+
+        URL subURL = mock(URL.class);
+        BusControllerConfig config = new BusControllerConfig();
+        config.setDataRouterSubscribeEndpoint(subURL);
+        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+        when(subURL.openConnection()).thenReturn(huc);
+        doAnswer(new Answer() {
+            boolean forceRetry = true;
+
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                if (forceRetry) {
+                    forceRetry = false;
+                    throw new IOException();
+                }
+                return 200;
+            }
+        }).when(huc).getResponseCode();
+        objUnderTest.start(config);
+        verify(huc, times(2)).getResponseCode();
+    }
+
+    @Test
+    public void testStartReadTimeout() throws IOException {
+        PowerMockito.mockStatic(Thread.class);
+
+        URL subURL = mock(URL.class);
+        BusControllerConfig config = new BusControllerConfig();
+        config.setDataRouterSubscribeEndpoint(subURL);
+        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
+        when(subURL.openConnection()).thenReturn(huc);
+        doThrow(new IOException()).when(huc).getResponseCode();
+        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start(config));
+    }
+
+    @Test
+    public void testRequestInboundLimitedStateServiceUnavailable() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
+        Sender responseSender = mock(Sender.class);
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+        when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
+        objUnderTest.setLimited(true);
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(httpServerExchange).setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
+    }
+
+    @Test
+    public void testRequestInboundLimitedStateServiceNoEmission() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
+        Sender responseSender = mock(Sender.class);
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+        when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
+        objUnderTest.setLimited(true);
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(eventReceiver, times(0)).receive(any());
+    }
+
+
+
+    @Test
+    public void testRequestInboundInvalidMetadata() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/invalid_metadata.json")))).getAsJsonObject();
+        when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString());
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+        verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata.");
+
+    }
+
+    @Test
+    public void testRequestInboundNoMetadata() throws Exception{
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        Receiver receiver = mock(Receiver.class);
+        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+            callback.handle(httpServerExchange, "");
+            return null;
+        }).when(receiver).receiveFullString(any());
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Runnable runnable = invocationOnMock.getArgument(0);
+            runnable.run();
+            return null;
+        }).when(httpServerExchange).dispatch(any(Runnable.class));
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+        verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata.");
+
+    }
+
+    @Test
+    public void testRequestInboundSuccess() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        Receiver receiver = mock(Receiver.class);
+        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+        String testString = "MESSAGE BODY";
+        JsonObject metadata = new JsonParser().parse(new String(Files.readAllBytes(Paths.get("src/test/resources/valid_metadata.json")))).getAsJsonObject();
+        EventMetadata metadataObj = new GsonBuilder().create().fromJson(metadata, EventMetadata.class);
+
+        when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt())).thenReturn(metadata.toString());
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+            callback.handle(httpServerExchange, testString);
+            return null;
+        }).when(receiver).receiveFullString(any());
+
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Runnable runnable = invocationOnMock.getArgument(0);
+            runnable.run();
+            return null;
+        }).when(httpServerExchange).dispatch(any(Runnable.class));
+
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(eventReceiver, times(1)).receive(new Event(httpServerExchange, testString, metadataObj));
+    }
+}
diff --git a/src/test/resources/invalid_metadata.json b/src/test/resources/invalid_metadata.json
new file mode 100644
index 0000000..31600b0
--- /dev/null
+++ b/src/test/resources/invalid_metadata.json
@@ -0,0 +1,11 @@
+{
+  "vendorName": "Ericsson",
+  "lastEpochMicrosec": "1538478000000",
+  "sourceName": "oteNB5309",
+  "startEpochMicrosec": "1538478900000",
+  "timeZoneOffset": "UTC+05.00",
+  "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+  "compression": "gzip",
+  "fileFormatType": "org.3GPP.32.435#measCollec",
+  "fileFormatVersion": "V9"
+}
\ No newline at end of file
diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..1f0955d
--- /dev/null
+++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
diff --git a/src/test/resources/valid_metadata.json b/src/test/resources/valid_metadata.json
new file mode 100644
index 0000000..21de3fb
--- /dev/null
+++ b/src/test/resources/valid_metadata.json
@@ -0,0 +1,12 @@
+{
+  "productName": "NrRadio",
+  "vendorName": "Ericsson",
+  "lastEpochMicrosec": "1538478000000",
+  "sourceName": "oteNB5309",
+  "startEpochMicrosec": "1538478900000",
+  "timeZoneOffset": "UTC+05.00",
+  "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
+  "compression": "gzip",
+  "fileFormatType": "org.3GPP.32.435#measCollec",
+  "fileFormatVersion": "V9"
+}
\ No newline at end of file