Merge "Fix issue with filtering multiple measValue"
diff --git a/dpo/blueprints/k8s-pm-mapper.yaml b/dpo/blueprints/k8s-pm-mapper.yaml
index 0944da3..cd52e76 100644
--- a/dpo/blueprints/k8s-pm-mapper.yaml
+++ b/dpo/blueprints/k8s-pm-mapper.yaml
@@ -25,10 +25,6 @@
   - "https://nexus.onap.org/service/local/repositories/raw/content/org.onap.dcaegen2.platform.plugins/R4/k8splugin/1.4.5/k8splugin_types.yaml"
 
 inputs:
-  service_name:
-    type: string
-    description: Name of the serice
-    default: "dcae-pm-mapper"
   tag_version:
     type: string
     description: Docker image to be used
@@ -48,7 +44,7 @@
   client_role:
     type: string
     description: Client role to request secure access to topic
-    default: "ves-publisher"
+    default: "org.onap.dmaap.mr.PM_MAPPER.pub"
   client_id:
     type: string
     description: Client id for given AAF client
@@ -64,11 +60,7 @@
   dcae_location:
     type: string
     description: DCAE location for the subscriber, used to set up routing
-    default: "location"
-  subscriber_id:
-    type: string
-    description: Subscriber id in Data Router
-    default: ""
+    default: "san-francisco"
   pm_mapper_service_protocol:
     type: string
     description: PM Mapper protocol
@@ -77,18 +69,6 @@
     type: string
     description: PM Mapper host port
     default: "8443"
-  dmaap_buscontroller_service_host:
-    type: string
-    description: DMAAP Bus Controller host address
-    default: "dmaap-bc.onap.svc.cluster.local"
-  dmaap_buscontroller_service_port:
-    type: string
-    description: DMAAP Bus Controller host port
-    default: "8080"
-  dmaap_dr_feed_id:
-    type: string
-    description: ID of the Data Router feed that the PM Mapper will subscribe to
-    default: "1"
   dmaap_dr_service_host:
     type: string
     description: DMAAP Data Router host address
@@ -105,10 +85,6 @@
     type: string
     description: DMAAP Message Router host port
     default: "3905"
-  dmaap_mr_topic_name:
-    type: string
-    description: Name of Message Router topic events will be published to
-    default: "pm-mapper-ves"
   filter:
     type: string
     description: PM Mapper filter on measInfo, measInfoId, measType, instanceId
@@ -140,9 +116,10 @@
                      ":", { get_input: dmaap_buscontroller_service_port}, "/webapi/dr_subs"]}
         dmaap_dr_feed_id:
           get_input: dmaap_dr_feed_id
+        dmaap_dr_feed_name: "bulk_pm_feed"
         dmaap_dr_delete_endpoint:
           { concat: ["https://", { get_input: dmaap_dr_service_host },
-                     ":", { get_input: dmaap_dr_service_port}, "/delete"]}
+                     ":", { get_input: dmaap_dr_service_port},"/delete"]}
         pm-mapper-filter:
           get_input: filter
         streams_subscribes:
@@ -156,11 +133,9 @@
                 get_input: dmaap_dr_password
               location:
                 get_input: dcae_location
-              subscriber_id:
-                get_input: subscriber_id
+              subscriber_id: "1"
               delivery_url:
-                { concat: [{ get_input: pm_mapper_service_protocol },"://", { get_input: service_name }, ".onap.svc.cluster.local",
-                           ":", { get_input: pm_mapper_service_port }, "/delivery"]}
+                { concat: [{ get_input: pm_mapper_service_protocol },"://dcae-pm-mapper:",{ get_input: pm_mapper_service_port },"/delivery"]}
         streams_publishes:
           dmaap_publisher:
             aaf_username:
@@ -175,8 +150,8 @@
               client_id:
                 get_input: client_id
               topic_url:
-                { concat: ["https://", { get_input: dmaap_mr_service_host },
-                           ":", { get_input: dmaap_mr_service_port }, "/events/", { get_input: dmaap_mr_topic_name }]}
+                { concat: [{ get_input: pm_mapper_service_protocol },"://",{ get_input: dmaap_mr_service_host },
+                           ":",{ get_input: dmaap_mr_service_port },"/events/PM_MAPPER"]}
               location:
                 get_input: dcae_location
       docker_config:
@@ -188,8 +163,8 @@
       image:
         get_input: tag_version
       replicas: { get_input: replicas }
-      name: { get_input: service_name }
-      dns_name: { get_input: service_name }
+      name: "dcae-pm-mapper"
+      dns_name: "dcae-pm-mapper"
       log_info:
         log_directory: "/var/log/ONAP/dcaegen2/services/pm-mapper"
       tls_info:
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
index 25e3918..a5eb68d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/App.java
@@ -28,13 +28,11 @@
 import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
 import org.onap.dcaegen2.services.pmmapper.config.Configurable;
 import org.onap.dcaegen2.services.pmmapper.config.DynamicConfiguration;
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
-import org.onap.dcaegen2.services.pmmapper.exceptions.CBSConfigException;
+import org.onap.dcaegen2.services.pmmapper.datarouter.DeliveryHandler;
 import org.onap.dcaegen2.services.pmmapper.exceptions.CBSServerError;
 import org.onap.dcaegen2.services.pmmapper.exceptions.EnvironmentConfigException;
 import org.onap.dcaegen2.services.pmmapper.exceptions.MapperConfigException;
 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
 import org.onap.dcaegen2.services.pmmapper.filtering.MetadataFilter;
 import org.onap.dcaegen2.services.pmmapper.filtering.MeasFilterHandler;
 import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
@@ -67,7 +65,7 @@
     private static Path xmlSchema = Paths.get("/opt/app/pm-mapper/etc/measCollec_plusString.xsd");
     private static FluxSink<Event> fluxSink;
 
-    public static void main(String[] args) throws InterruptedException, TooManyTriesException, CBSConfigException, EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
+    public static void main(String[] args) throws EnvironmentConfigException, CBSServerError, MapperConfigException, IOException {
         Flux<Event> flux = Flux.create(eventFluxSink -> fluxSink = eventFluxSink);
         HealthCheckHandler healthCheckHandler = new HealthCheckHandler();
         MapperConfig mapperConfig = new ConfigHandler().getMapperConfig();
@@ -86,18 +84,16 @@
                 .runOn(Schedulers.newParallel(""), 1)
                 .doOnNext(event -> MDC.setContextMap(event.getMdc()))
                 .filter(metadataFilter::filter)
-                .filter(filterHandler::filterByFileType)
-                .filter(validator::validate)
+                .filter(event -> App.filterByFileType(filterHandler, event, mapperConfig))
+                .filter(event -> App.validate(validator, event, mapperConfig))
                 .concatMap(event -> App.split(splitter,event, mapperConfig))
                 .filter(events -> App.filter(filterHandler, events, mapperConfig))
                 .concatMap(events -> App.map(mapper, events, mapperConfig))
                 .concatMap(vesPublisher::publish)
                 .subscribe(event -> App.sendEventProcessed(mapperConfig, event));
 
-        DataRouterSubscriber dataRouterSubscriber = new DataRouterSubscriber(fluxSink::next, mapperConfig);
-        dataRouterSubscriber.start();
+        DeliveryHandler deliveryHandler = new DeliveryHandler(fluxSink::next);
         ArrayList<Configurable> configurables = new ArrayList<>();
-        configurables.add(dataRouterSubscriber);
         configurables.add(mapperConfig);
         DynamicConfiguration dynamicConfiguration = new DynamicConfiguration(configurables, mapperConfig);
 
@@ -113,12 +109,40 @@
 
         builder.addHttpsListener(8443, "0.0.0.0", sslContext)
                 .setHandler(Handlers.routing()
-                        .add("put", "/delivery/{filename}", dataRouterSubscriber)
+                        .add("put", "/delivery/{filename}", deliveryHandler)
                         .add("get", "/healthcheck", healthCheckHandler)
                         .add("get", "/reconfigure", dynamicConfiguration))
                 .build().start();
     }
 
+    public static boolean filterByFileType(MeasFilterHandler filterHandler,Event event, MapperConfig config) {
+        boolean hasValidFileName = false;
+        try {
+            hasValidFileName = filterHandler.filterByFileType(event);
+            if(!hasValidFileName) {
+                sendEventProcessed(config,event);
+            }
+        } catch (Exception exception) {
+            logger.unwrap().error("Unable to filter by file type", exception);
+            sendEventProcessed(config,event);
+        }
+        return hasValidFileName;
+    }
+
+    public static boolean validate(XMLValidator validator, Event event, MapperConfig config) {
+        boolean isValidXML = false;
+        try {
+            isValidXML = validator.validate(event);
+            if(!isValidXML) {
+                sendEventProcessed(config,event);
+            }
+        } catch (Exception exception) {
+            logger.unwrap().error("Unable to validate XML",exception);
+            sendEventProcessed(config,event);
+        }
+        return isValidXML;
+    }
+
     public static boolean filter(MeasFilterHandler filterHandler, List<Event> events, MapperConfig config) {
         Event event = events.get(0);
         boolean hasMatchingFilter = false;
@@ -128,7 +152,7 @@
                 sendEventProcessed(config,event);
             }
         } catch (Exception exception) {
-            logger.unwrap().error(exception.getMessage(),exception);
+            logger.unwrap().error("Unable to filter by Meas Types",exception);
             sendEventProcessed(config,event);
         }
         return hasMatchingFilter;
@@ -139,7 +163,7 @@
         try {
             mappedEvents = mapper.mapEvents(events);
         } catch (Exception exception) {
-            logger.unwrap().error(exception.getMessage(),exception);
+            logger.unwrap().error("Unable to map XML to VES",exception);
             sendEventProcessed(config,events.get(0));
             return Flux.<List<Event>>empty();
         }
@@ -151,7 +175,7 @@
         try {
             splitEvents = splitter.split(event);
         } catch (Exception exception) {
-            logger.unwrap().error(exception.getMessage(),exception);
+            logger.unwrap().error("Unable to split MeasCollecFile",exception);
             sendEventProcessed(config,event);
             return Flux.<List<Event>>empty();
         }
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java
index fe2f247..e50ec6c 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/config/ConfigHandler.java
@@ -59,14 +59,14 @@
 

     /**

      * Retrieves PM-Mapper Configuration from DCAE's ConfigBinding Service.

-     *
+     *

      * @throws EnvironmentConfigException

      * @throws ConsulServerError

      * @throws CBSConfigException

      * @throws CBSServerError

      * @throws MapperConfigException

-     */
-    public MapperConfig getMapperConfig() throws CBSConfigException, EnvironmentConfigException,

+     */

+    public MapperConfig getMapperConfig() throws EnvironmentConfigException,

             CBSServerError, MapperConfigException {

         String mapperConfigJson = "";

         String cbsSocketAddress = EnvironmentConfig.getCBSHostName() + ":" + EnvironmentConfig.getCBSPort();

diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
deleted file mode 100644
index a0a8eaf..0000000
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriber.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.pmmapper.datarouter;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import io.undertow.util.HeaderValues;
-import lombok.Data;
-import lombok.NonNull;
-
-import org.onap.dcaegen2.services.pmmapper.config.Configurable;
-import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
-import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
-import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.model.Event;
-import io.undertow.server.HttpHandler;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.StatusCodes;
-
-import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
-import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
-import org.onap.logging.ref.slf4j.ONAPLogAdapter;
-import org.onap.logging.ref.slf4j.ONAPLogConstants;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-/**
- * Subscriber for events sent from data router
- * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
- */
-@Data
-public class DataRouterSubscriber implements HttpHandler, Configurable {
-    public static final String METADATA_HEADER = "X-DMAAP-DR-META";
-    public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
-
-    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DataRouterSubscriber.class));
-    private static final int NUMBER_OF_ATTEMPTS = 5;
-    private static final int DEFAULT_TIMEOUT = 2000;
-    private static final int MAX_JITTER = 50;
-
-    private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
-    private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
-
-    private boolean limited = false;
-    private Random jitterGenerator;
-    private Gson metadataBuilder;
-    private MapperConfig config;
-    public static String subscriberId;
-    @NonNull
-    private EventReceiver eventReceiver;
-
-    /**
-     * @param eventReceiver receiver for any inbound events.
-     */
-    public DataRouterSubscriber(EventReceiver eventReceiver, MapperConfig config) {
-        this.eventReceiver = eventReceiver;
-        this.jitterGenerator = new Random();
-        this.metadataBuilder = new GsonBuilder().registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
-                .create();
-        this.config = config;
-        this.subscriberId="";
-    }
-
-    /**
-     * Starts data flow by subscribing to data router through bus controller.
-     *
-     * @throws TooManyTriesException in the event that timeout has occurred several times.
-     */
-    public void start() throws TooManyTriesException, InterruptedException {
-        try {
-            logger.unwrap().info("Starting subscription to DataRouter {}", ONAPLogConstants.Markers.ENTRY);
-            subscribe();
-            logger.unwrap().info("Successfully started DR Subscriber");
-        } finally {
-            logger.unwrap().info("{}", ONAPLogConstants.Markers.EXIT);
-        }
-    }
-
-    private HttpURLConnection getBusControllerConnection(String method, URL resource, int timeout) throws IOException {
-        HttpURLConnection connection = (HttpURLConnection) resource.openConnection();
-        connection.setRequestMethod(method);
-        connection.setConnectTimeout(timeout);
-        connection.setReadTimeout(timeout);
-        connection.setRequestProperty("Content-Type", "application/json");
-        connection.setDoOutput(true);
-
-        final UUID invocationID = logger.invoke(ONAPLogConstants.InvocationMode.SYNCHRONOUS);
-        final UUID requestID = UUID.randomUUID();
-        connection.setRequestProperty(ONAPLogConstants.Headers.REQUEST_ID, requestID.toString());
-        connection.setRequestProperty(ONAPLogConstants.Headers.INVOCATION_ID, invocationID.toString());
-        connection.setRequestProperty(ONAPLogConstants.Headers.PARTNER_NAME, MapperConfig.CLIENT_NAME);
-
-        return connection;
-    }
-
-    private JsonObject getBusControllerSubscribeBody(MapperConfig config) {
-        JsonObject subscriberObj = new JsonObject();
-        subscriberObj.addProperty("dcaeLocationName", config.getSubscriberDcaeLocation());
-        subscriberObj.addProperty("deliveryURL", config.getBusControllerDeliveryUrl());
-        subscriberObj.addProperty("feedId", config.getDmaapDRFeedId());
-        subscriberObj.addProperty("lastMod", Instant.now().toString());
-        subscriberObj.addProperty("username", config.getBusControllerUserName());
-        subscriberObj.addProperty("userpwd", config.getBusControllerPassword());
-        subscriberObj.addProperty("privilegedSubscriber", true);
-        return subscriberObj;
-    }
-
-    private void processResponse(HttpURLConnection connection) throws IOException {
-        try (BufferedReader responseBody = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
-            String body = responseBody.lines().collect(Collectors.joining(""));
-            updateSubscriberId(body);
-        } catch (IOException | JsonSyntaxException | IllegalStateException e) {
-            throw new IOException("Failed to process response", e);
-        }
-    }
-
-    private void updateSubscriberId(String responseBody) {
-            JsonParser parser = new JsonParser();
-            JsonObject responseObject = parser.parse(responseBody).getAsJsonObject();
-            this.subscriberId = responseObject.get("subId").getAsString();
-    }
-
-    private void subscribe() throws TooManyTriesException, InterruptedException {
-        try {
-            URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
-            JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
-            request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "POST", subscribeResource, subscribeBody);
-        } catch (MalformedURLException e) {
-            throw new IllegalStateException("Subscription URL is malformed", e);
-        }
-
-    }
-    private void updateSubscriber() throws TooManyTriesException, InterruptedException {
-        try {
-            URL subscribeResource = this.config.getBusControllerSubscriptionUrl();
-            URL updateResource = new URL(String.format("%s/%s", subscribeResource, subscriberId));
-            JsonObject subscribeBody = this.getBusControllerSubscribeBody(this.config);
-            request(NUMBER_OF_ATTEMPTS, DEFAULT_TIMEOUT, "PUT", updateResource, subscribeBody);
-        } catch (MalformedURLException e) {
-            throw new IllegalStateException("Subscription URL is malformed", e);
-        }
-    }
-
-    private void request(int attempts, int timeout, String method, URL resource, JsonObject subscribeBody) throws TooManyTriesException, InterruptedException {
-        int subResponse = 504;
-        String subMessage = "";
-        boolean processFailure = false;
-        try {
-            HttpURLConnection connection = getBusControllerConnection(method, resource, timeout);
-            try (OutputStream bodyStream = connection.getOutputStream();
-                 OutputStreamWriter bodyWriter = new OutputStreamWriter(bodyStream, StandardCharsets.UTF_8)) {
-                bodyWriter.write(subscribeBody.toString());
-            }
-            subResponse = connection.getResponseCode();
-            subMessage = connection.getResponseMessage();
-            if (subResponse < 300) {
-                processResponse(connection);
-            }
-        } catch (IOException e) {
-            logger.unwrap().error("Failure to process response", e);
-            processFailure = true;
-        }
-        logger.unwrap().info("Request to bus controller executed with Response Code: '{}' and Response Event: '{}'.", subResponse, subMessage);
-        if ((subResponse >= 300 || processFailure) && attempts > 1 ) {
-            Thread.sleep(timeout);
-            request(--attempts, (timeout * 2) + jitterGenerator.nextInt(MAX_JITTER), method, resource, subscribeBody);
-        } else if (subResponse >= 300 || processFailure) {
-            throw new TooManyTriesException("Failed to subscribe within appropriate amount of attempts");
-        }
-    }
-
-    private EventMetadata getMetadata(HttpServerExchange httpServerExchange) throws NoMetadataException {
-        String metadata = Optional.ofNullable(httpServerExchange.getRequestHeaders()
-                .get(METADATA_HEADER))
-                .map((HeaderValues headerValues) -> headerValues.get(0))
-                .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
-        return metadataBuilder.fromJson(metadata, EventMetadata.class);
-    }
-
-    /**
-     * Receives inbound requests, verifies that required headers are valid
-     * and passes an Event onto the eventReceiver.
-     * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
-     *
-     * @param httpServerExchange inbound http server exchange.
-     */
-    @Override
-    public void handleRequest(HttpServerExchange httpServerExchange) {
-        try{
-            logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
-            if (limited) {
-                httpServerExchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE)
-                        .getResponseSender()
-                        .send(StatusCodes.SERVICE_UNAVAILABLE_STRING);
-            } else {
-                try {
-
-                    Map<String,String> mdc = MDC.getCopyOfContextMap();
-                    EventMetadata metadata = getMetadata(httpServerExchange);
-                    String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
-                    httpServerExchange.getRequestReceiver()
-                            .receiveFullString((callbackExchange, body) ->
-                                httpServerExchange.dispatch(() ->
-                                        eventReceiver.receive(new Event(callbackExchange, body, metadata, mdc, publishIdentity)))
-                            );
-                } catch (NoMetadataException exception) {
-                    logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
-                    httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
-                            .getResponseSender()
-                            .send(NO_METADATA_MESSAGE);
-                } catch (JsonParseException exception) {
-                    logger.unwrap().info("Bad Request: Failure to parse metadata", exception);
-                    httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
-                            .getResponseSender()
-                            .send(BAD_METADATA_MESSAGE);
-                }
-            }
-        } finally {
-            logger.exiting();
-        }
-    }
-
-    @Override
-    public void reconfigure(MapperConfig config) throws ReconfigurationException {
-        logger.unwrap().info("Checking new Configuration against existing.");
-        if(!this.config.dmaapInfoEquals(config) || !this.config.getDmaapDRFeedId().equals(config.getDmaapDRFeedId())){
-            logger.unwrap().info("DMaaP Info changes found, reconfiguring.");
-            try {
-                this.config = config;
-                this.updateSubscriber();
-            } catch (TooManyTriesException | InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new ReconfigurationException("Failed to reconfigure DataRouter subscriber.", e);
-            }
-        }
-
-    }
-}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java
new file mode 100644
index 0000000..4d6af29
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandler.java
@@ -0,0 +1,119 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParseException;
+import io.undertow.util.HeaderValues;
+import lombok.Data;
+import lombok.NonNull;
+
+import org.onap.dcaegen2.services.pmmapper.exceptions.NoMetadataException;
+import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import io.undertow.server.HttpHandler;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.StatusCodes;
+
+import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
+import org.onap.dcaegen2.services.pmmapper.utils.RequiredFieldDeserializer;
+import org.onap.logging.ref.slf4j.ONAPLogAdapter;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Provides an undertow HttpHandler to be used as an endpoint for data router to send events to.
+ */
+@Data
+public class DeliveryHandler implements HttpHandler {
+
+    public static final String METADATA_HEADER = "X-DMAAP-DR-META";
+    public static final String PUB_ID_HEADER = "X-DMAAP-DR-PUBLISH-ID";
+
+    private static final ONAPLogAdapter logger = new ONAPLogAdapter(LoggerFactory.getLogger(DeliveryHandler.class));
+
+    private static final String BAD_METADATA_MESSAGE = "Malformed Metadata.";
+    private static final String NO_METADATA_MESSAGE = "Missing Metadata.";
+
+    private Gson metadataBuilder;
+
+    @NonNull
+    private EventReceiver eventReceiver;
+
+    /**
+     * @param eventReceiver receiver for any inbound events.
+     */
+    public DeliveryHandler(EventReceiver eventReceiver) {
+        this.eventReceiver = eventReceiver;
+        this.metadataBuilder = new GsonBuilder()
+                .registerTypeAdapter(EventMetadata.class, new RequiredFieldDeserializer<EventMetadata>())
+                .create();
+    }
+
+    private EventMetadata getMetadata(HttpServerExchange httpServerExchange) throws NoMetadataException {
+        String metadata = Optional.ofNullable(httpServerExchange.getRequestHeaders()
+                .get(METADATA_HEADER))
+                .map((HeaderValues headerValues) -> headerValues.get(0))
+                .orElseThrow(() -> new NoMetadataException("Metadata Not found"));
+        return metadataBuilder.fromJson(metadata, EventMetadata.class);
+    }
+
+    /**
+     * Receives inbound requests, verifies that required headers are valid
+     * and passes an Event onto the eventReceiver.
+     * The forwarded httpServerExchange response is the responsibility of the eventReceiver.
+     *
+     * @param httpServerExchange inbound http server exchange.
+     */
+    @Override
+    public void handleRequest(HttpServerExchange httpServerExchange) {
+        try{
+            logger.entering(new HttpServerExchangeAdapter(httpServerExchange));
+            try {
+                Map<String,String> mdc = MDC.getCopyOfContextMap();
+                EventMetadata metadata = getMetadata(httpServerExchange);
+                String publishIdentity = httpServerExchange.getRequestHeaders().get(PUB_ID_HEADER).getFirst();
+                httpServerExchange.getRequestReceiver()
+                        .receiveFullString((callbackExchange, body) ->
+                                httpServerExchange.dispatch(() ->
+                                        eventReceiver.receive(new Event(
+                                                callbackExchange, body, metadata, mdc, publishIdentity)))
+                        );
+            } catch (NoMetadataException exception) {
+                logger.unwrap().info("Bad Request: no metadata found under '{}' header.", METADATA_HEADER, exception);
+                httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+                        .getResponseSender()
+                        .send(NO_METADATA_MESSAGE);
+            } catch (JsonParseException exception) {
+                logger.unwrap().info("Bad Request: Failure to parse metadata", exception);
+                httpServerExchange.setStatusCode(StatusCodes.BAD_REQUEST)
+                        .getResponseSender()
+                        .send(BAD_METADATA_MESSAGE);
+            }
+        } finally {
+            logger.exiting();
+        }
+    }
+}
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
index 20c8a64..1fb6019 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/filtering/MetadataFilter.java
@@ -22,7 +22,6 @@
 
 import lombok.NonNull;
 import org.onap.dcaegen2.services.pmmapper.exceptions.*;
-import org.onap.dcaegen2.services.pmmapper.mapping.Mapper;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -48,7 +47,6 @@
      * @param event inbound event
      */
     public boolean filter(@NonNull Event event) {
-        String decompressionStatus;
         logger.unwrap().info("Filtering event metadata");
         EventMetadata metadata = event.getMetadata();
 
@@ -56,11 +54,6 @@
 
         List<MeasFilterConfig.Filter> filters = measFilterConfig.getFilters();
 
-        if(metadata.getDecompressionStatus() != null) {
-            decompressionStatus = metadata.getDecompressionStatus();
-            logger.unwrap().debug("Decompression Status: {}", decompressionStatus);
-        }
-
         if(filters.isEmpty()) {
             logger.unwrap().info("No filter specified in config: {}", filters);
             return true;
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
index 8a0977d..601b00f 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/EventMetadata.java
@@ -19,7 +19,6 @@
  */
 package org.onap.dcaegen2.services.pmmapper.model;
 
-import com.google.gson.annotations.SerializedName;
 import lombok.Data;
 import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
 
@@ -48,6 +47,4 @@
     private String fileFormatType;
     @GSONRequired
     private String fileFormatVersion;
-    @SerializedName("decompression_status")
-    private String decompressionStatus;
 }
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
index b9d58ee..390fa0d 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MapperConfig.java
@@ -19,9 +19,6 @@
  */
 package org.onap.dcaegen2.services.pmmapper.model;
 
-import java.net.MalformedURLException;
-import java.net.URL;
-
 import org.onap.dcaegen2.services.pmmapper.config.Configurable;
 import org.onap.dcaegen2.services.pmmapper.utils.GSONRequired;
 import com.google.gson.annotations.SerializedName;
@@ -68,14 +65,6 @@
     private StreamsPublishes streamsPublishes;
 
     @GSONRequired
-    @SerializedName("buscontroller_feed_subscription_endpoint")
-    private String busControllerSubscriptionEndpoint;
-
-    @GSONRequired
-    @SerializedName("dmaap_dr_feed_id")
-    private String dmaapDRFeedId;
-
-    @GSONRequired
     @SerializedName("dmaap_dr_delete_endpoint")
     private String dmaapDRDeleteEndpoint;
 
@@ -83,34 +72,10 @@
     @SerializedName("pm-mapper-filter")
     private MeasFilterConfig filterConfig;
 
-    public String getBusControllerDeliveryUrl() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getDeliveryUrl();
-    }
-
-    public String getDcaeLocation() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
-    }
-
-    public String getBusControllerUserName() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getUsername();
-    }
-
-    public String getBusControllerPassword() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getPassword();
-    }
-
-    public URL getBusControllerSubscriptionUrl() throws MalformedURLException {
-        return new URL(this.getBusControllerSubscriptionEndpoint());
-    }
-
     public String getSubscriberIdentity(){
         return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getSubscriberId();
     }
 
-    public String getSubscriberDcaeLocation() {
-        return this.getStreamsSubscribes().getDmaapSubscriber().getDmaapInfo().getLocation();
-    }
-
     public String getPublisherTopicUrl() {
         return this.getStreamsPublishes().getDmaapPublisher().getDmaapInfo().getTopicUrl();
     }
@@ -187,10 +152,9 @@
     @Override
     public void reconfigure(MapperConfig mapperConfig) {
         if(!this.equals(mapperConfig)) {
+            this.filterConfig = mapperConfig.getFilterConfig();
             this.streamsSubscribes = mapperConfig.getStreamsSubscribes();
             this.streamsPublishes = mapperConfig.getStreamsPublishes();
-            this.busControllerSubscriptionEndpoint = mapperConfig.getBusControllerSubscriptionEndpoint();
-            this.dmaapDRFeedId = mapperConfig.getDmaapDRFeedId();
             this.dmaapDRDeleteEndpoint = mapperConfig.getDmaapDRDeleteEndpoint();
         }
     }
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
index 0f1aaa9..a7d211f 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/model/MeasFilterConfig.java
@@ -33,6 +33,7 @@
 @NoArgsConstructor

 public class MeasFilterConfig {

 

+  @GSONRequired

   @SerializedName("filters")

   public List<Filter> filters;

 

diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
index 5147863..23e8d71 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtils.java
@@ -20,7 +20,6 @@
 
 package org.onap.dcaegen2.services.pmmapper.utils;
 
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
@@ -42,12 +41,11 @@
     public static String processEvent(MapperConfig config, Event event){
         logger.unwrap().info("Sending processed to DataRouter");
         String baseDelete = config.getDmaapDRDeleteEndpoint();
-        String subscriberIdentity = DataRouterSubscriber.subscriberId;
+        String subscriberIdentity = config.getSubscriberIdentity();
         String delete = String.format("%s/%s/%s", baseDelete, subscriberIdentity, event.getPublishIdentity());
         try {
             return new RequestSender().send("DELETE", delete);
         } catch (Exception exception) {
-            logger.unwrap().error("Process event failure", exception);
             throw new ProcessEventException("Process event failure", exception);
         }
     }
diff --git a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
index e956398..258b831 100644
--- a/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
+++ b/src/main/java/org/onap/dcaegen2/services/pmmapper/utils/RequiredFieldDeserializer.java
@@ -25,8 +25,13 @@
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParseException;
 
+import lombok.NonNull;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
 
 
 /**
@@ -38,20 +43,35 @@
     @Override
     public T deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
         T obj = new Gson().fromJson(jsonElement, type);
-        for (Field field : obj.getClass().getDeclaredFields()) {
-            if (field.getAnnotation(GSONRequired.class) != null) {
-                field.setAccessible(true);
-                try {
-                    if (field.get(obj) == null) {
-                        throw new JsonParseException(String.format("Field: '%s', is required but not found.", field.getName()));
-                    }
-                } catch (Exception exception) {
-                    throw new JsonParseException("Failed to check fields.", exception);
-                }
+        validateRequiredFields(obj.getClass().getDeclaredFields(), obj);
+        return obj;
+    }
+
+    private void validateRequiredFields(@NonNull Field[] fields, @NonNull Object pojo) {
+        if (pojo instanceof List) {
+            final List<?> pojoList = (List<?>) pojo;
+            for (final Object pojoListPojo : pojoList) {
+                validateRequiredFields(pojoListPojo.getClass().getDeclaredFields(), pojoListPojo);
             }
         }
 
-        return obj;
+        Stream.of(fields)
+            .filter(field -> field.getAnnotation(GSONRequired.class) != null)
+            .forEach(field -> {
+                try {
+                    field.setAccessible(true);
+                    Object fieldObj = Optional.ofNullable(field.get(pojo))
+                        .orElseThrow(()-> new JsonParseException(
+                            String.format("Field '%s' in class '%s' is required but not found.",
+                            field.getName(), pojo.getClass().getSimpleName())));
+
+                    Field[] declaredFields = fieldObj.getClass().getDeclaredFields();
+                    validateRequiredFields(declaredFields, fieldObj);
+                }
+                catch (Exception exception) {
+                    throw new JsonParseException("Failed to check fields.", exception);
+                }
+            });
     }
 
 }
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 0d5d83c..dff2f8b 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -15,7 +15,7 @@
     <property name="p_thr" value="%thread"/>

     <property name="pattern" value="%nopexception${p_tim}\t${p_thr}\t${p_lvl}\t${p_log}\t${p_mdc}\t${p_msg}\t${p_exc}\t${p_mak}\t%n"/>

 

-    <variable name="logLevel" value="${LOG_LEVEL:-INFO}"/>

+    <variable name="logLevel" value="${LOG_LEVEL:-DEBUG}"/>

 

     <logger name="org.mockserver" level="${mockserver.logLevel:-OFF}"/>

 

diff --git a/src/test/java/org/onap/dcaegen2/pmmapper/config/ConfigHandlerTests.java b/src/test/java/org/onap/dcaegen2/pmmapper/config/ConfigHandlerTests.java
index f6aa2a8..e2bb4f5 100644
--- a/src/test/java/org/onap/dcaegen2/pmmapper/config/ConfigHandlerTests.java
+++ b/src/test/java/org/onap/dcaegen2/pmmapper/config/ConfigHandlerTests.java
@@ -25,6 +25,8 @@
 import java.io.IOException;

 import java.io.InputStreamReader;

 import java.net.UnknownHostException;

+import java.util.HashMap;

+import java.util.Map;

 

 import static org.junit.Assert.assertEquals;

 import static org.junit.Assert.assertTrue;

@@ -48,6 +50,8 @@
 import org.powermock.modules.junit4.PowerMockRunner;

 

 import com.google.gson.Gson;

+import com.google.gson.JsonObject;

+import com.google.gson.JsonParser;

 

 import ch.qos.logback.classic.spi.ILoggingEvent;

 import ch.qos.logback.core.read.ListAppender;

@@ -114,8 +118,24 @@
 

     @Test

     public void mapper_parse_valid_json_missing_attributes() throws Exception {

-        when(sender.send(anyString())).thenReturn(getFileContents("incomplete_mapper_config.json"));

-        assertThrows(MapperConfigException.class, this::getMapperConfig);

+        Map<String,String> invalidConfigs = new HashMap<>();

+        invalidConfigs.put("streams_subscribes", "{}");

+        invalidConfigs.put("streams_publishes", "{}");

+        invalidConfigs.put("streams_publishes", null);

+        invalidConfigs.remove("streams_publishes");

+        invalidConfigs.put("pm-mapper-filter", null);

+        invalidConfigs.put("pm-mapper-filter", "{}");

+        invalidConfigs.put("pm-mapper-filter", "{ \"filters\": null},");

+        invalidConfigs.put("pm-mapper-filter", "{ \"filters\": [{\"pmDefVsn\": \"V9\"}] },");

+

+        invalidConfigs.forEach( (k,v) -> {

+            try {

+                when(sender.send(anyString())).thenReturn( getInvalidConfig(k,v));

+                assertThrows(MapperConfigException.class, this::getMapperConfig);

+            } catch (Exception e) {

+                e.printStackTrace();

+            }

+        });

     }

 

     private MapperConfig getMapperConfig()

@@ -136,4 +156,10 @@
         return fileAsString;

     }

 

+    private String getInvalidConfig(String validKey, String invalidValue) {

+        JsonObject invalidConfigJson = new JsonParser().parse(validMapperConfig).getAsJsonObject();

+        invalidConfigJson.addProperty(validKey, invalidValue);

+        return invalidConfigJson.toString();

+    }

+

 }

diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
index b4dc178..7c5340a 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/AppTest.java
@@ -23,18 +23,21 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 import static org.mockserver.integration.ClientAndServer.startClientAndServer;
 import static org.mockserver.model.HttpResponse.response;
 
+import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.gson.Gson;
+import io.undertow.server.HttpServerExchange;
 import io.undertow.util.StatusCodes;
+import org.onap.dcaegen2.services.pmmapper.utils.XMLValidator;
 import reactor.core.publisher.Flux;
 
 import org.junit.jupiter.api.AfterAll;
@@ -53,6 +56,7 @@
 import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
 import org.onap.dcaegen2.services.pmmapper.utils.MeasConverter;
 import org.onap.dcaegen2.services.pmmapper.utils.MeasSplitter;
+import utils.EventUtils;
 
 
 @ExtendWith(MockitoExtension.class)
@@ -61,6 +65,13 @@
     static ClientAndServer mockServer;
     static MockServerClient client;
 
+    private static EventMetadata eventMetadata;
+
+    private static final Path dataDirectory = Paths.get("src/test/resources/mapper_test/mapping_data/");
+    private static final Path metadata = Paths.get("src/test/resources/valid_metadata.json");
+    private static final Path schema = Paths.get("src/main/resources/measCollec_plusString.xsd");
+
+
     @BeforeAll
     public static void setup() {
         mockServer =  startClientAndServer(1080);
@@ -99,6 +110,88 @@
     }
 
     @Test
+    public void testFilterByFileType_success() {
+        Event mockEvent = Mockito.mock(Event.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        HttpServerExchange exchange = Mockito.mock(HttpServerExchange.class);
+        when(mockEvent.getHttpServerExchange()).thenReturn(exchange);
+        when(exchange.getRequestPath()).thenReturn("ATEST.xml");
+
+        boolean result = App.filterByFileType(new MeasFilterHandler(new MeasConverter()), mockEvent, mockConfig);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testFilterByFileType_NonXML() {
+        Event mockEvent = Mockito.mock(Event.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        HttpServerExchange exchange = Mockito.mock(HttpServerExchange.class);
+        when(mockEvent.getHttpServerExchange()).thenReturn(exchange);
+        when(exchange.getRequestPath()).thenReturn("ATEST.png");
+
+        boolean result = App.filterByFileType(new MeasFilterHandler(new MeasConverter()), mockEvent, mockConfig);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testFilterByFileType_throwException() {
+        Event mockEvent = Mockito.mock(Event.class);
+        MeasFilterHandler mockFilter = Mockito.mock(MeasFilterHandler.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        Mockito.when(mockFilter.filterByFileType(mockEvent)).thenThrow(RuntimeException.class);
+
+        boolean result = App.filterByFileType(mockFilter, mockEvent, mockConfig);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testValidateXML_success() throws IOException {
+        XMLValidator mockValidator = new XMLValidator(schema);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        String metadataFileContents = new String(Files.readAllBytes(metadata));
+        eventMetadata = new Gson().fromJson(metadataFileContents, EventMetadata.class);
+
+        Path testFile = Paths.get(dataDirectory + "/valid_data/meas_results.xml");
+        Event mockEvent = EventUtils.makeMockEvent(EventUtils.fileContentsToString(testFile), eventMetadata);
+
+        boolean result = App.validate(mockValidator, mockEvent, mockConfig);
+
+        assertTrue(result);
+    }
+
+    @Test
+    public void testValidateXML_failure() throws IOException {
+        XMLValidator mockValidator = new XMLValidator(schema);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        String metadataFileContents = new String(Files.readAllBytes(metadata));
+        eventMetadata = new Gson().fromJson(metadataFileContents, EventMetadata.class);
+
+        Path testFile = Paths.get(dataDirectory + "/invalid_data/no_managed_element.xml");
+        Event mockEvent = EventUtils.makeMockEvent(EventUtils.fileContentsToString(testFile), eventMetadata);
+
+        boolean result = App.validate(mockValidator, mockEvent, mockConfig);
+
+        assertFalse(result);
+    }
+
+    @Test
+    public void testValidateXML_throwException() {
+        Event mockEvent = Mockito.mock(Event.class);
+        XMLValidator mockValidator = Mockito.mock(XMLValidator.class);
+        MapperConfig mockConfig = Mockito.mock(MapperConfig.class);
+
+        Mockito.when(mockValidator.validate(mockEvent)).thenThrow(RuntimeException.class);
+        boolean result = App.validate(mockValidator, mockEvent, mockConfig);
+
+        assertFalse(result);
+    }
+
+    @Test
     public void testFilter_success() {
         Event mockEvent = Mockito.mock(Event.class);
         List<Event> mockEvents = Arrays.asList(mockEvent);
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java
index 8840825..c900942 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/config/DynamicConfigurationTest.java
@@ -105,7 +105,7 @@
         Configurable configurable = mock(Configurable.class);
         configurables.add(configurable);
         JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject();
-        modifiedConfig.addProperty("dmaap_dr_feed_id","3");
+        modifiedConfig.addProperty("dmaap_dr_delete_endpoint","http://modified-delete-endpoint/1");
         when(sender.send(any())).thenReturn(modifiedConfig.toString());
         MapperConfig modifiedMapperConfig = configHandler.getMapperConfig();
 
@@ -137,7 +137,7 @@
         Configurable configurable = mock(Configurable.class);
         configurables.add(configurable);
         JsonObject modifiedConfig = new JsonParser().parse(config).getAsJsonObject();
-        modifiedConfig.addProperty("dmaap_dr_feed_id","3");
+        modifiedConfig.addProperty("dmaap_dr_delete_endpoint","http://modified-delete-endpoint/1");
 
         when(sender.send(any())).thenReturn(modifiedConfig.toString());
         MapperConfig modifiedMapperConfig = configHandler.getMapperConfig();
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
deleted file mode 100644
index dbb95a7..0000000
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DataRouterSubscriberTest.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.pmmapper.datarouter;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import io.undertow.io.Receiver;
-import io.undertow.io.Sender;
-import io.undertow.server.HttpServerExchange;
-import io.undertow.util.HeaderMap;
-import io.undertow.util.StatusCodes;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.jupiter.api.Assertions;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.onap.dcaegen2.services.pmmapper.config.ConfigHandler;
-import org.onap.dcaegen2.services.pmmapper.exceptions.ReconfigurationException;
-import org.onap.dcaegen2.services.pmmapper.exceptions.TooManyTriesException;
-import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;
-import org.onap.dcaegen2.services.pmmapper.model.Event;
-import org.onap.dcaegen2.services.pmmapper.model.MapperConfig;
-import org.onap.dcaegen2.services.pmmapper.utils.HttpServerExchangeAdapter;
-import org.onap.dcaegen2.services.pmmapper.utils.RequestSender;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import utils.LoggingUtils;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({DataRouterSubscriber.class, EnvironmentConfig.class})
-public class DataRouterSubscriberTest {
-
-    private Path VALID_BC_RESPONSE_PATH = Paths.get("src/test/resources/datarouter_subscriber_test/valid_bc_response.json");
-    private Path VALID_METADATA_PATH = Paths.get("src/test/resources/valid_metadata.json");
-    private Path INVALID_METADATA_PATH = Paths.get("src/test/resources/invalid_metadata.json");
-    private Path VALID_CONFIG_PATH = Paths.get("src/test/resources/valid_mapper_config.json");
-
-    @Mock
-    private EventReceiver eventReceiver;
-    @Mock
-    private MapperConfig config;
-
-    private DataRouterSubscriber objUnderTest;
-
-    @Before
-    public void setUp() {
-        objUnderTest = new DataRouterSubscriber(eventReceiver, config);
-    }
-
-    @Test
-    public void testStartTooManyTriesWithResponse() throws IOException {
-        PowerMockito.mockStatic(Thread.class);
-
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(300);
-        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
-    }
-
-    @Test
-    public void testStartImmediateSuccess() throws IOException, TooManyTriesException, InterruptedException {
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-        objUnderTest.start();
-        verify(huc, times(1)).getResponseCode();
-    }
-
-    @Test
-    public void testStartDelayedSuccess() throws IOException, TooManyTriesException, InterruptedException {
-        PowerMockito.mockStatic(Thread.class);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        doAnswer(new Answer() {
-            boolean forceRetry = true;
-
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-                if (forceRetry) {
-                    forceRetry = false;
-                    throw new IOException();
-                }
-                return 200;
-            }
-        }).when(huc).getResponseCode();
-        objUnderTest.start();
-        verify(huc, times(2)).getResponseCode();
-    }
-
-    @Test
-    public void testStartReadTimeout() throws IOException {
-        PowerMockito.mockStatic(Thread.class);
-
-        URL subEndpoint = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(subEndpoint);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        when(subEndpoint.openConnection()).thenReturn(huc);
-        doThrow(new IOException()).when(huc).getResponseCode();
-        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
-    }
-
-    @Test
-    public void testRequestInboundLimitedStateServiceUnavailable() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
-        HttpServerExchangeAdapter adapterMock = PowerMockito.mock(HttpServerExchangeAdapter.class);
-        PowerMockito.whenNew(HttpServerExchangeAdapter.class).withAnyArguments().thenReturn(adapterMock);
-
-        Sender responseSender = mock(Sender.class);
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
-        objUnderTest.setLimited(true);
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(httpServerExchange).setStatusCode(StatusCodes.SERVICE_UNAVAILABLE);
-    }
-
-    @Test
-    public void testRequestInboundLimitedStateServiceNoEmission() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class);
-        HttpServerExchangeAdapter adapterMock = PowerMockito.mock(HttpServerExchangeAdapter.class);
-        PowerMockito.whenNew(HttpServerExchangeAdapter.class).withAnyArguments().thenReturn(adapterMock);
-
-        Sender responseSender = mock(Sender.class);
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        when(httpServerExchange.getResponseSender()).thenReturn(responseSender);
-        objUnderTest.setLimited(true);
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(eventReceiver, times(0)).receive(any());
-    }
-
-    @Test
-    public void testStartPositiveResponseCodeInvalidResponseBody() throws Exception{
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.mockStatic(Thread.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        URL mockURL = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = "not a valid response";
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(mockURL.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-        Assertions.assertThrows(TooManyTriesException.class, () -> objUnderTest.start());
-    }
-
-    @Test
-    public void testRequestInboundInvalidMetadata() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
-        JsonObject metadata = new JsonParser().parse(new String(Files
-                .readAllBytes(INVALID_METADATA_PATH))).getAsJsonObject();
-        when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt()))
-                .thenReturn(metadata.toString());
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
-        verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata.");
-
-    }
-
-    @Test
-    public void testRequestInboundNoMetadata() throws Exception {
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
-        Receiver receiver = mock(Receiver.class);
-        HeaderMap headers = mock(HeaderMap.class);
-        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
-        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
-        when(httpServerExchange.getRequestHeaders()).thenReturn(headers);
-        when(headers.get(any(String.class))).thenReturn(null);
-
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
-            callback.handle(httpServerExchange, "");
-            return null;
-        }).when(receiver).receiveFullString(any());
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Runnable runnable = invocationOnMock.getArgument(0);
-            runnable.run();
-            return null;
-        }).when(httpServerExchange).dispatch(any(Runnable.class));
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
-        verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata.");
-
-    }
-
-    @Test
-    public void testRequestInboundSuccess() throws Exception {
-        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterSubscriber.class);
-        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
-        Receiver receiver = mock(Receiver.class);
-        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
-        String testString = "MESSAGE BODY";
-        JsonObject metadata = new JsonParser().parse(
-                new String(Files.readAllBytes(VALID_METADATA_PATH))).getAsJsonObject();
-        when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.METADATA_HEADER).get(anyInt()))
-                .thenReturn(metadata.toString());
-        when(httpServerExchange.getRequestHeaders().get(DataRouterSubscriber.PUB_ID_HEADER).getFirst()).thenReturn("");
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
-            callback.handle(httpServerExchange, testString);
-            return null;
-        }).when(receiver).receiveFullString(any());
-
-        doAnswer((Answer<Void>) invocationOnMock -> {
-            Runnable runnable = invocationOnMock.getArgument(0);
-            runnable.run();
-            return null;
-        }).when(httpServerExchange).dispatch(any(Runnable.class));
-
-        objUnderTest.handleRequest(httpServerExchange);
-        verify(eventReceiver, times(1)).receive(any(Event.class));
-
-        assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
-        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
-        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
-        assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT");
-        logAppender.stop();
-    }
-
-    @Test
-    public void testConfigThrowsMalformedURLException() throws MalformedURLException {
-        when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class);
-        Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.start());
-    }
-    @Test
-    public void testReconfigurationSameConfig() throws Exception {
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        RequestSender sender = mock(RequestSender.class);
-        String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
-        when(sender.send(any())).thenReturn(mapperConfig);
-        MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
-
-        DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
-        objUnderTest.reconfigure(originalMapperConfig);
-        assertEquals(originalMapperConfig, objUnderTest.getConfig());
-    }
-
-    @Test
-    public void testReconfigurationModifiedFeedId() throws Exception {
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        URL mockURL = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(mockURL.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-
-        PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
-
-        RequestSender sender = mock(RequestSender.class);
-        String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
-        when(sender.send(any())).thenReturn(mapperConfig);
-        MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
-        JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject();
-        modifiedMapperConfigObj.addProperty("dmaap_dr_feed_id", "3");
-        when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString());
-        MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig();
-
-        DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
-        objUnderTest.reconfigure(modifiedMapperConfig);
-        assertEquals(modifiedMapperConfig, objUnderTest.getConfig());
-    }
-
-    @Test
-    public void testReconfigurationModifiedUsername() throws Exception {
-        PowerMockito.mockStatic(EnvironmentConfig.class);
-        PowerMockito.when(EnvironmentConfig.getCBSHostName()).thenReturn("");
-        PowerMockito.when(EnvironmentConfig.getCBSPort()).thenReturn(1);
-        PowerMockito.when(EnvironmentConfig.getServiceName()).thenReturn("");
-
-        URL mockURL = mock(URL.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(mockURL);
-        HttpURLConnection huc = mock(HttpURLConnection.class, RETURNS_DEEP_STUBS);
-        String bcResponse = new String(Files.readAllBytes(VALID_BC_RESPONSE_PATH));
-        InputStream responseStream = new ByteArrayInputStream(bcResponse.getBytes(StandardCharsets.UTF_8));
-        when(huc.getInputStream()).thenReturn(responseStream);
-        when(mockURL.openConnection()).thenReturn(huc);
-        when(huc.getResponseCode()).thenReturn(200);
-
-        PowerMockito.whenNew(URL.class).withAnyArguments().thenReturn(mockURL);
-
-        RequestSender sender = mock(RequestSender.class);
-        String mapperConfig = new String(Files.readAllBytes(VALID_CONFIG_PATH));
-        when(sender.send(any())).thenReturn(mapperConfig);
-        MapperConfig originalMapperConfig = new ConfigHandler(sender).getMapperConfig();
-        JsonObject modifiedMapperConfigObj = new JsonParser().parse(mapperConfig).getAsJsonObject();
-        modifiedMapperConfigObj.get("streams_subscribes")
-                .getAsJsonObject().get("dmaap_subscriber")
-                .getAsJsonObject().get("dmaap_info")
-                .getAsJsonObject()
-                .addProperty("username", "bob");
-        when(sender.send(any())).thenReturn(modifiedMapperConfigObj.toString());
-        MapperConfig modifiedMapperConfig = new ConfigHandler(sender).getMapperConfig();
-
-        DataRouterSubscriber objUnderTest = new DataRouterSubscriber(eventReceiver, originalMapperConfig);
-        objUnderTest.reconfigure(modifiedMapperConfig);
-        assertEquals(modifiedMapperConfig, objUnderTest.getConfig());
-    }
-
-    @Test
-    public void testReconfigurationMalformedURL() throws Exception {
-        when(config.getBusControllerSubscriptionUrl()).thenThrow(MalformedURLException.class);
-        Assertions.assertThrows(IllegalStateException.class, () -> objUnderTest.reconfigure(config));
-    }
-    @Test
-    public void testReconfigurationException() throws Exception {
-        PowerMockito.mockStatic(Thread.class);
-        URL url = mock(URL.class);
-        when(url.toString()).thenReturn("http://valid:8080/");
-        when(url.openConnection()).thenThrow(IOException.class);
-        when(config.getBusControllerSubscriptionUrl()).thenReturn(url);
-        Assertions.assertThrows(ReconfigurationException.class, () -> objUnderTest.reconfigure(config));
-    }
-}
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandlerTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandlerTest.java
new file mode 100644
index 0000000..94a2c7d
--- /dev/null
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/datarouter/DeliveryHandlerTest.java
@@ -0,0 +1,148 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.pmmapper.datarouter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.undertow.io.Receiver;
+import io.undertow.server.HttpServerExchange;
+import io.undertow.util.HeaderMap;
+import io.undertow.util.StatusCodes;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.stubbing.Answer;
+import org.onap.dcaegen2.services.pmmapper.model.EnvironmentConfig;
+import org.onap.dcaegen2.services.pmmapper.model.Event;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import utils.LoggingUtils;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DeliveryHandler.class, EnvironmentConfig.class})
+public class DeliveryHandlerTest {
+
+    private Path VALID_METADATA_PATH = Paths.get("src/test/resources/valid_metadata.json");
+    private Path INVALID_METADATA_PATH = Paths.get("src/test/resources/invalid_metadata.json");
+
+    @Mock
+    private EventReceiver eventReceiver;
+
+    private DeliveryHandler objUnderTest;
+
+    @Before
+    public void setUp() {
+        objUnderTest = new DeliveryHandler(eventReceiver);
+    }
+
+    @Test
+    public void testRequestInboundInvalidMetadata() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        JsonObject metadata = new JsonParser().parse(new String(Files
+                .readAllBytes(INVALID_METADATA_PATH))).getAsJsonObject();
+        when(httpServerExchange.getRequestHeaders().get(any(String.class)).get(anyInt()))
+                .thenReturn(metadata.toString());
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+        verify(httpServerExchange.getResponseSender(), times(1)).send("Malformed Metadata.");
+
+    }
+
+    @Test
+    public void testRequestInboundNoMetadata() throws Exception {
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        Receiver receiver = mock(Receiver.class);
+        HeaderMap headers = mock(HeaderMap.class);
+        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+        when(httpServerExchange.setStatusCode(anyInt())).thenReturn(httpServerExchange);
+        when(httpServerExchange.getRequestHeaders()).thenReturn(headers);
+        when(headers.get(any(String.class))).thenReturn(null);
+
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+            callback.handle(httpServerExchange, "");
+            return null;
+        }).when(receiver).receiveFullString(any());
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Runnable runnable = invocationOnMock.getArgument(0);
+            runnable.run();
+            return null;
+        }).when(httpServerExchange).dispatch(any(Runnable.class));
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(httpServerExchange, times(1)).setStatusCode(StatusCodes.BAD_REQUEST);
+        verify(httpServerExchange.getResponseSender(), times(1)).send("Missing Metadata.");
+
+    }
+
+    @Test
+    public void testRequestInboundSuccess() throws Exception {
+        ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DeliveryHandler.class);
+        HttpServerExchange httpServerExchange = mock(HttpServerExchange.class, RETURNS_DEEP_STUBS);
+        Receiver receiver = mock(Receiver.class);
+        when(httpServerExchange.getRequestReceiver()).thenReturn(receiver);
+        String testString = "MESSAGE BODY";
+        JsonObject metadata = new JsonParser().parse(
+                new String(Files.readAllBytes(VALID_METADATA_PATH))).getAsJsonObject();
+        when(httpServerExchange.getRequestHeaders().get(DeliveryHandler.METADATA_HEADER).get(anyInt()))
+                .thenReturn(metadata.toString());
+        when(httpServerExchange.getRequestHeaders().get(DeliveryHandler.PUB_ID_HEADER).getFirst()).thenReturn("");
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Receiver.FullStringCallback callback = invocationOnMock.getArgument(0);
+            callback.handle(httpServerExchange, testString);
+            return null;
+        }).when(receiver).receiveFullString(any());
+
+        doAnswer((Answer<Void>) invocationOnMock -> {
+            Runnable runnable = invocationOnMock.getArgument(0);
+            runnable.run();
+            return null;
+        }).when(httpServerExchange).dispatch(any(Runnable.class));
+
+        objUnderTest.handleRequest(httpServerExchange);
+        verify(eventReceiver, times(1)).receive(any(Event.class));
+
+        assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY");
+        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
+        assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
+        assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT");
+        logAppender.stop();
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
index 9975849..b2e6308 100644
--- a/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
+++ b/src/test/java/org/onap/dcaegen2/services/pmmapper/utils/DataRouterUtilsTest.java
@@ -40,7 +40,6 @@
 import com.google.gson.JsonParser;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.onap.dcaegen2.services.pmmapper.datarouter.DataRouterSubscriber;
 import org.onap.dcaegen2.services.pmmapper.exceptions.ProcessEventException;
 import org.onap.dcaegen2.services.pmmapper.model.Event;
 import org.onap.dcaegen2.services.pmmapper.model.EventMetadata;
@@ -57,7 +56,7 @@
 import javax.net.ssl.SSLContext;
 
 @PowerMockIgnore({"org.apache.http.conn.ssl.*", "javax.net.ssl.*" , "javax.crypto.*"})
-@PrepareForTest({RequestSender.class,DataRouterSubscriber.class})
+@PrepareForTest(RequestSender.class)
 @RunWith(PowerMockRunner.class)
 public class DataRouterUtilsTest {
 
diff --git a/src/test/resources/datarouter_subscriber_test/valid_bc_response.json b/src/test/resources/datarouter_subscriber_test/valid_bc_response.json
deleted file mode 100644
index 201b786..0000000
--- a/src/test/resources/datarouter_subscriber_test/valid_bc_response.json
+++ /dev/null
@@ -1,15 +0,0 @@
-{
-  "type": "dr_Sub",
-  "lastMod": "2019-03-11T14:29:39.659",
-  "status": "VALID",
-  "dcaeLocationName": "location",
-  "deliveryURL": "delivery_url",
-  "feedId": "2",
-  "logURL": "https://dmaap-dr-prov/sublog/2",
-  "owner": "DGL",
-  "subId": "1",
-  "suspended": false,
-  "use100": false,
-  "username": "username",
-  "userpwd": "password"
-}
\ No newline at end of file
diff --git a/src/test/resources/incomplete_mapper_config.json b/src/test/resources/incomplete_mapper_config.json
deleted file mode 100644
index ed4ebd7..0000000
--- a/src/test/resources/incomplete_mapper_config.json
+++ /dev/null
@@ -1,24 +0,0 @@
-{

-    "_comment": "This mapper config is missing streams_subscribes",

-    "pm-mapper-filter": {

-        "filters": "{[]}"

-    },

-    "3GPP.schema.file": "{\"3GPP_Schema\":\"./etc/3GPP_relaxed_schema.xsd\"}",

-    "streams_subscribes": null,

-    "streams_publishes": {

-        "pm_mapper_handle_out": {

-            "type": "message_router",

-            "aaf_password": null,

-            "dmaap_info": {

-                "topic_url": "https://we-are-message-router.us:3905/events/some-topic",

-                "client_role": null,

-                "location": null,

-                "client_id": null

-            },

-            "aaf_username": null

-        }

-    },

-    "some parameter": "unauthenticated.PM_VES_OUTPUT",

-    "some field": "1",

-    "services_calls": {}

-}
\ No newline at end of file
diff --git a/src/test/resources/multiple_filter_mapper_config.json b/src/test/resources/multiple_filter_mapper_config.json
index 89bca57..251beb2 100644
--- a/src/test/resources/multiple_filter_mapper_config.json
+++ b/src/test/resources/multiple_filter_mapper_config.json
@@ -27,8 +27,11 @@
       "aaf_username": null
     }
   },
-  "dmaap_dr_feed_id": "2",
-  "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
   "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
-  "services_calls": {}
+  "services_calls": {},
+  "key_store_path": "src/test/resources/testkeystore.jks.b64",
+  "key_store_pass_path": "src/test/resources/password",
+  "trust_store_path": "src/test/resources/testkeystore.jks.b64",
+  "trust_store_pass_path": "src/test/resources/password",
+  "enable_http": false
 }
\ No newline at end of file
diff --git a/src/test/resources/no_filter_mapper_config.json b/src/test/resources/no_filter_mapper_config.json
index 3f855cf..87fc021 100644
--- a/src/test/resources/no_filter_mapper_config.json
+++ b/src/test/resources/no_filter_mapper_config.json
@@ -27,8 +27,11 @@
       "aaf_username": null
     }
   },
-  "dmaap_dr_feed_id": "2",
-  "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",
   "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",
-  "services_calls": {}
+  "services_calls": {},
+  "key_store_path": "src/test/resources/testkeystore.jks.b64",
+  "key_store_pass_path": "src/test/resources/password",
+  "trust_store_path": "src/test/resources/testkeystore.jks.b64",
+  "trust_store_pass_path": "src/test/resources/password",
+  "enable_http": false
 }
\ No newline at end of file
diff --git a/src/test/resources/valid_mapper_config.json b/src/test/resources/valid_mapper_config.json
index e37b77e..3d9d707 100644
--- a/src/test/resources/valid_mapper_config.json
+++ b/src/test/resources/valid_mapper_config.json
@@ -27,8 +27,6 @@
             "aaf_username": null

         }

     },

-    "dmaap_dr_feed_id": "2",

-    "buscontroller_feed_subscription_endpoint": "http://dmaap-bc.onap.svc.cluster.local:8080/webapi/dr_subs",

     "dmaap_dr_delete_endpoint": "http://dmaap-dr-node.onap.svc.cluster.local:8443/delete",

     "services_calls": {},

     "key_store_path": "src/test/resources/testkeystore.jks.b64",

diff --git a/src/test/resources/valid_metadata.json b/src/test/resources/valid_metadata.json
index cf21437..21de3fb 100644
--- a/src/test/resources/valid_metadata.json
+++ b/src/test/resources/valid_metadata.json
@@ -8,6 +8,5 @@
   "location": "ftpes://192.168.0.101:22/ftp/rop/A20161224.1045-1100.bin.gz",
   "compression": "gzip",
   "fileFormatType": "org.3GPP.32.435#measCollec",
-  "fileFormatVersion": "V9",
-  "decompression_status": "false"
+  "fileFormatVersion": "V9"
 }
\ No newline at end of file