diff --git a/.gitignore b/.gitignore
index d03f9a9..37707c8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,7 +45,7 @@
 .mvn/timing.properties
 .mvn/wrapper/maven-wrapper.jar
 
-# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
-hs_err_pid*
+# CheckStyle files
+.checkstyle
 
 opt/
\ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 82c390f..f89a101 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -42,6 +42,8 @@
 import org.springframework.stereotype.Component;
 
 /**
+ * Holds all configuration for the DFC.
+ *
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
@@ -79,8 +81,10 @@
         return ftpesConfiguration;
     }
 
+    /**
+     * Reads the configuration from file.
+     */
     public void loadConfigurationFromFile() {
-
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
         JsonParser parser = new JsonParser();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index b4dc635..5272333 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,4 +1,4 @@
-/*
+/*-
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
@@ -18,6 +18,7 @@
 
 import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.INVOCATION_ID;
 import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
@@ -25,7 +26,9 @@
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
+
 import javax.annotation.PostConstruct;
+
 import org.apache.commons.lang3.StringUtils;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
@@ -40,11 +43,15 @@
 import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
+
 import io.swagger.annotations.ApiOperation;
 import reactor.core.publisher.Mono;
 
 /**
+ * Api for starting and stopping DFC.
+ *
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 @Configuration
 @EnableScheduling
@@ -63,11 +70,18 @@
     private final ScheduledTasks scheduledTask;
     private final CloudConfiguration cloudConfiguration;
 
+    /**
+     * Constructor.
+     *
+     * @param taskScheduler The scheduler used to schedule the tasks.
+     * @param scheduledTasks The scheduler that will actually handle the tasks.
+     * @param cloudConfiguration The DFC configuration.
+     */
     @Autowired
-    public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
-        CloudConfiguration cloudConfiguration) {
+    public SchedulerConfig(TaskScheduler taskScheduler, ScheduledTasks scheduledTasks,
+            CloudConfiguration cloudConfiguration) {
         this.taskScheduler = taskScheduler;
-        this.scheduledTask = scheduledTask;
+        this.scheduledTask = scheduledTasks;
         this.cloudConfiguration = cloudConfiguration;
     }
 
@@ -84,7 +98,7 @@
         logger.info(EXIT, "Stopped Datafile workflow");
         MDC.clear();
         return Mono.defer(() -> Mono
-            .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
+                .just(new ResponseEntity<>("Datafile Service has already been stopped!", HttpStatus.CREATED)));
     }
 
     /**
@@ -106,12 +120,14 @@
         contextMap = MDC.getCopyOfContextMap();
         logger.info(ENTRY, "Start scheduling Datafile workflow");
         if (scheduledFutureList.isEmpty()) {
-            scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap), Instant.now(),
-                    SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
-            scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
-                    SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
-            scheduledFutureList.add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
-                   SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
+            scheduledFutureList.add(taskScheduler.scheduleAtFixedRate(() -> cloudConfiguration.runTask(contextMap),
+                    Instant.now(), SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY));
+            scheduledFutureList.add(
+                    taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.scheduleMainDatafileEventTask(contextMap),
+                            SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS));
+            scheduledFutureList
+                    .add(taskScheduler.scheduleWithFixedDelay(() -> scheduledTask.purgeCachedInformation(Instant.now()),
+                            SCHEDULING_DELAY_FOR_DATAFILE_PURGE_CACHE));
 
             return true;
         } else {
@@ -119,4 +135,4 @@
         }
 
     }
-}
\ No newline at end of file
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index a8f79ea..af45cc9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -88,11 +88,17 @@
         }
     }
 
+    /**
+     * Parses the Json message and returns a stream of messages.
+     *
+     * @param rawMessage the Json message to parse.
+     * @return a <code>Flux</code> containing messages.
+     */
     public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
         return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
     }
 
-    public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+    Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
         JsonParser jsonParser = new JsonParser();
         return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
                 : element.isJsonObject() ? Optional.of((JsonObject) element)
@@ -136,13 +142,11 @@
                 List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
                 if (!allFileDataFromJson.isEmpty()) {
                     MessageMetaData messageMetaData = optionalMessageMetaData.get();
-                    // @formatter:off
-                    return Mono.just(ImmutableFileReadyMessage.builder()
-                            .pnfName(messageMetaData.sourceName())
-                            .messageMetaData(messageMetaData)
-                            .files(allFileDataFromJson)
+                    return Mono.just(ImmutableFileReadyMessage.builder() //
+                            .pnfName(messageMetaData.sourceName()) //
+                            .messageMetaData(messageMetaData) //
+                            .files(allFileDataFromJson) //
                             .build());
-                    // @formatter:on
                 } else {
                     return Mono.empty();
                 }
@@ -168,18 +172,16 @@
         // version.
         getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
 
-        // @formatter:off
-        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
-                .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
-                .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
-                .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
-                .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues))
-                .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues))
-                .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues))
-                .changeIdentifier(changeIdentifier)
-                .changeType(changeType)
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+                .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues)) //
+                .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues)) //
+                .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues)) //
+                .sourceName(getValueFromJson(commonEventHeader, SOURCE_NAME, missingValues)) //
+                .startEpochMicrosec(getValueFromJson(commonEventHeader, START_EPOCH_MICROSEC, missingValues)) //
+                .timeZoneOffset(getValueFromJson(commonEventHeader, TIME_ZONE_OFFSET, missingValues)) //
+                .changeIdentifier(changeIdentifier) //
+                .changeType(changeType) //
                 .build();
-        // @formatter:on
         if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
             return Optional.of(messageMetaData);
         } else {
@@ -231,16 +233,14 @@
             logger.error("Unable to collect file from xNF.", e);
             return Optional.empty();
         }
-        // @formatter:off
-        FileData fileData = ImmutableFileData.builder()
-                .name(getValueFromJson(fileInfo, NAME, missingValues))
-                .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
-                .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
-                .location(location)
-                .scheme(scheme)
-                .compression(getValueFromJson(data, COMPRESSION, missingValues))
+        FileData fileData = ImmutableFileData.builder() //
+                .name(getValueFromJson(fileInfo, NAME, missingValues)) //
+                .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues)) //
+                .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues)) //
+                .location(location) //
+                .scheme(scheme) //
+                .compression(getValueFromJson(data, COMPRESSION, missingValues)) //
                 .build();
-        // @formatter:on
         if (missingValues.isEmpty()) {
             return Optional.of(fileData);
         }
@@ -250,8 +250,8 @@
     }
 
     /**
-     * Gets data from the event name, defined as:
-     * {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
+     * Gets data from the event name.
+     * Defined as: {DomainAbbreviation}_{productName}-{vendorName}_{Description}, example:
      * Noti_RnNode-Ericsson_FileReady
      *
      * @param dataType The type of data to get, {@link DmaapConsumerJsonParser.EventNameDataType}.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
index 2cb8411..e2dca18 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/PublishedFileCache.java
@@ -13,6 +13,7 @@
  * the License.
  * ============LICENSE_END========================================================================
  */
+
 package org.onap.dcaegen2.collectors.datafile.service;
 
 import java.nio.file.Path;
@@ -29,14 +30,30 @@
 public class PublishedFileCache {
     private final Map<Path, Instant> publishedFiles = Collections.synchronizedMap(new HashMap<Path, Instant>());
 
+    /**
+     * Adds a file to the cache.
+     *
+     * @param path the name of the file to add.
+     * @return <code>null</code> if the file is not already in the cache.
+     */
     public Instant put(Path path) {
         return publishedFiles.put(path, Instant.now());
     }
 
+    /**
+     * Removes a file from the cache.
+     *
+     * @param localFileName name of the file to remove.
+     */
     public void remove(Path localFileName) {
         publishedFiles.remove(localFileName);
     }
 
+    /**
+     * Removes files 24 hours older than the given instant.
+     *
+     * @param now the instant will determine which files that will be purged.
+     */
     public void purge(Instant now) {
         for (Iterator<Map.Entry<Path, Instant>> it = publishedFiles.entrySet().iterator(); it.hasNext();) {
             Map.Entry<Path, Instant> pair = it.next();
@@ -46,7 +63,7 @@
         }
     }
 
-    public int size() {
+    int size() {
         return publishedFiles.size();
     }
 
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 57edc36..0fef9ab 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,24 +1,46 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
@@ -26,17 +48,30 @@
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
+
 import reactor.core.publisher.Mono;
 
 /**
+ * Publishes a file to the DataRouter.
+ *
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 public class DataRouterPublisher {
+    private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+    private static final String CONTENT_TYPE = "application/octet-stream";
+    private static final String NAME_JSON_TAG = "name";
+    private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
+    private static final String PUBLISH_TOPIC = "publish";
+    private static final String DEFAULT_FEED_ID = "1";
 
     private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
     private final AppConfig datafileAppConfig;
+    private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
 
     public DataRouterPublisher(AppConfig datafileAppConfig) {
         this.datafileAppConfig = datafileAppConfig;
@@ -44,25 +79,70 @@
 
 
     /**
-     * Publish one file
-     * @param consumerDmaapModel information about the file to publish
-     * @param maxNumberOfRetries the maximal number of retries if the publishing fails
-     * @param firstBackoffTimeout the time to delay the first retry
+     * Publish one file.
+     *
+     * @param model information about the file to publish
+     * @param numRetries the maximal number of retries if the publishing fails
+     * @param firstBackoff the time to delay the first retry
      * @return the HTTP response status as a string
      */
     public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
             Map<String, String> contextMap) {
         MdcVariables.setMdcContextMap(contextMap);
         logger.trace("Method called with arg {}", model);
-        DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+        dmaapProducerReactiveHttpClient = resolveClient();
 
-        //@formatter:off
         return Mono.just(model)
                 .cache()
-                .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
-                .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
+                .flatMap(m -> publishFile(m, contextMap)) //
+                .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
                 .retryBackoff(numRetries, firstBackoff);
-        //@formatter:on
+    }
+
+    private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
+        logger.trace("Entering publishFile with {}", consumerDmaapModel);
+        try {
+            HttpPut put = new HttpPut();
+            String requestId = MDC.get(REQUEST_ID);
+            put.addHeader(X_ONAP_REQUEST_ID, requestId);
+            String invocationId = UUID.randomUUID().toString();
+            put.addHeader(X_INVOCATION_ID, invocationId);
+
+            prepareHead(consumerDmaapModel, put);
+            prepareBody(consumerDmaapModel, put);
+            dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+
+            HttpResponse response =
+                    dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+            logger.trace(response.toString());
+            return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+        } catch (Exception e) {
+            logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+            return Mono.error(e);
+        }
+    }
+
+    private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+        put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
+        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+        metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+        metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
+        put.addHeader(X_DMAAP_DR_META, metaData.toString());
+        put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString()));
+    }
+
+    private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+        Path fileLocation = model.getInternalLocation();
+        try (InputStream fileInputStream = createInputStream(fileLocation)) {
+            put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+        }
+    }
+
+    private URI getPublishUri(String fileName) {
+        return dmaapProducerReactiveHttpClient.getBaseUri() //
+                .pathSegment(PUBLISH_TOPIC) //
+                .pathSegment(DEFAULT_FEED_ID) //
+                .pathSegment(fileName).build();
     }
 
     private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
@@ -77,6 +157,10 @@
         }
     }
 
+    InputStream createInputStream(Path filePath) throws IOException {
+        FileSystemResource realResource = new FileSystemResource(filePath);
+        return realResource.getInputStream();
+    }
 
     DmaapPublisherConfiguration resolveConfiguration() {
         return datafileAppConfig.getDmaapPublisherConfiguration();
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index af4670e..fb27a57 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -20,6 +20,7 @@
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Map;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -33,6 +34,7 @@
 import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import reactor.core.publisher.Mono;
 
 /**
@@ -52,12 +54,10 @@
         MdcVariables.setMdcContextMap(contextMap);
         logger.trace("Entering execute with {}", fileData);
 
-        //@formatter:off
-        return Mono.just(fileData)
-            .cache()
-            .flatMap(fd -> collectFile(fileData, metaData, contextMap))
-            .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
-        //@formatter:on
+        return Mono.just(fileData) //
+                .cache() //
+                .flatMap(fd -> collectFile(fileData, metaData, contextMap)) //
+                .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
     }
 
     private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData,
@@ -92,22 +92,20 @@
     private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
         String location = fileData.location();
 
-        // @formatter:off
-        return ImmutableConsumerDmaapModel.builder()
-                .productName(metaData.productName())
-                .vendorName(metaData.vendorName())
-                .lastEpochMicrosec(metaData.lastEpochMicrosec())
-                .sourceName(metaData.sourceName())
-                .startEpochMicrosec(metaData.startEpochMicrosec())
-                .timeZoneOffset(metaData.timeZoneOffset())
-                .name(fileData.name())
-                .location(location)
-                .internalLocation(localFile.toString())
-                .compression(fileData.compression())
-                .fileFormatType(fileData.fileFormatType())
-                .fileFormatVersion(fileData.fileFormatVersion())
+        return ImmutableConsumerDmaapModel.builder() //
+                .productName(metaData.productName()) //
+                .vendorName(metaData.vendorName()) //
+                .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
+                .sourceName(metaData.sourceName()) //
+                .startEpochMicrosec(metaData.startEpochMicrosec()) //
+                .timeZoneOffset(metaData.timeZoneOffset()) //
+                .name(fileData.name()) //
+                .location(location) //
+                .internalLocation(localFile) //
+                .compression(fileData.compression()) //
+                .fileFormatType(fileData.fileFormatType()) //
+                .fileFormatVersion(fileData.fileFormatVersion()) //
                 .build();
-        // @formatter:on
     }
 
     SftpClient createSftpClient(FileData fileData) throws DatafileTaskException {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
new file mode 100644
index 0000000..0729caa
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
@@ -0,0 +1,119 @@
+/*-
+* ============LICENSE_START=======================================================
+*  Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+/**
+ * Bean used to check with DataRouter if a file has been published.
+ *
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedChecker {
+    private static final String FEEDLOG_TOPIC = "feedlog";
+    private static final String DEFAULT_FEED_ID = "1";
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private AppConfig appConfig;
+
+    /**
+     * Constructor.
+     *
+     * @param appConfig The DFC configuration.
+     */
+    public PublishedChecker(AppConfig appConfig) {
+        this.appConfig = appConfig;
+    }
+
+    /**
+     * Checks with DataRouter if the given file has been published already.
+     *
+     * @param fileName the name of the file used when it is published.
+     *
+     * @return <code>true</code> if the file has been published before, <code>false</code> otherwise.
+     */
+    public boolean execute(String fileName, Map<String, String> contextMap) {
+        MdcVariables.setMdcContextMap(contextMap);
+        DmaapProducerReactiveHttpClient producerClient = resolveClient();
+
+        HttpGet getRequest = new HttpGet();
+        String requestId = MDC.get(REQUEST_ID);
+        getRequest.addHeader(X_ONAP_REQUEST_ID, requestId);
+        String invocationId = UUID.randomUUID().toString();
+        getRequest.addHeader(X_INVOCATION_ID, invocationId);
+        getRequest.setURI(getPublishedQueryUri(fileName, producerClient));
+        producerClient.addUserCredentialsToHead(getRequest);
+
+        try {
+            HttpResponse response =
+                    producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, 2000, contextMap);
+
+            logger.trace(response.toString());
+            int status = response.getStatusLine().getStatusCode();
+            HttpEntity entity = response.getEntity();
+            InputStream content = entity.getContent();
+            String body = IOUtils.toString(content);
+            return HttpStatus.SC_OK == status && !"[]".equals(body);
+        } catch (Exception e) {
+            logger.warn("Unable to check if file has been published.", e);
+            return false;
+        }
+    }
+
+    private URI getPublishedQueryUri(String fileName, DmaapProducerReactiveHttpClient producerClient) {
+        return producerClient.getBaseUri() //
+                .pathSegment(FEEDLOG_TOPIC) //
+                .pathSegment(DEFAULT_FEED_ID) //
+                .queryParam("type", "pub") //
+                .queryParam("filename", fileName) //
+                .build();
+    }
+
+    protected DmaapPublisherConfiguration resolveConfiguration() {
+        return appConfig.getDmaapPublisherConfiguration();
+    }
+
+    protected DmaapProducerReactiveHttpClient resolveClient() {
+        return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+    }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 2896337..d41e5c2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,4 +1,4 @@
-/*
+/*-
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
@@ -18,13 +18,13 @@
 
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
@@ -37,14 +37,15 @@
 import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 /**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready events from the message router, fetch new
+ * files from the PNF publish these in the data router.
  */
 @Component
 public class ScheduledTasks {
@@ -52,7 +53,7 @@
     private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
     private static final int MAX_ILDLE_THREAD_TIME_TO_LIVE_SECONDS = 10;
 
-    /** Data needed for fetching of one file */
+    /** Data needed for fetching of one file. */
     private class FileCollectionData {
         final FileData fileData;
         final MessageMetaData metaData;
@@ -64,6 +65,7 @@
     }
 
     private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
     private final AppConfig applicationConfiguration;
     private final AtomicInteger currentNumberOfTasks = new AtomicInteger();
     private final Scheduler scheduler =
@@ -96,17 +98,17 @@
                 .parallel(getParallelism()) // Each FileReadyMessage in a separate thread
                 .runOn(scheduler) //
                 .flatMap(this::createFileCollectionTask) //
-                .filter(this::shouldBePublished) //
+                .filter(fileData -> shouldBePublished(fileData, contextMap)) //
                 .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
                 .flatMap(fileData -> collectFileFromXnf(fileData, contextMap)) //
                 .flatMap(model -> publishToDataRouter(model, contextMap)) //
-                .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()), contextMap)) //
+                .doOnNext(model -> deleteFile(model.getInternalLocation(), contextMap)) //
                 .doOnNext(model -> currentNumberOfTasks.decrementAndGet()) //
                 .sequential();
     }
 
     /**
-     * called in regular intervals to remove out-dated cached information
+     * called in regular intervals to remove out-dated cached information.
      */
     public void purgeCachedInformation(Instant now) {
         alreadyPublishedFiles.purge(now);
@@ -144,8 +146,13 @@
         return Flux.fromIterable(fileCollects);
     }
 
-    private boolean shouldBePublished(FileCollectionData task) {
-        return alreadyPublishedFiles.put(task.fileData.getLocalFileName()) == null;
+    private boolean shouldBePublished(FileCollectionData task, Map<String, String> contextMap) {
+        boolean result = false;
+        Path localFileName = task.fileData.getLocalFileName();
+        if (alreadyPublishedFiles.put(localFileName) == null) {
+            result = !createPublishedChecker().execute(localFileName.getFileName().toString(), contextMap);
+        }
+        return result;
     }
 
     private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect,
@@ -156,7 +163,7 @@
         MdcVariables.setMdcContextMap(contextMap);
         return createFileCollector()
                 .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout,
-                        contextMap)
+                contextMap)
                 .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, contextMap));
     }
 
@@ -174,10 +181,9 @@
         final long maxNumberOfRetries = 3;
         final Duration initialRetryTimeout = Duration.ofSeconds(5);
 
-        DataRouterPublisher publisherTask = createDataRouterPublisher();
 
         MdcVariables.setMdcContextMap(contextMap);
-        return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
+        return createDataRouterPublisher().execute(model, maxNumberOfRetries, initialRetryTimeout, contextMap)
                 .onErrorResume(exception -> handlePublishFailure(model, exception, contextMap));
     }
 
@@ -185,7 +191,7 @@
             Map<String, String> contextMap) {
         MdcVariables.setMdcContextMap(contextMap);
         logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
-        Path internalFileName = Paths.get(model.getInternalLocation());
+        Path internalFileName = model.getInternalLocation();
         deleteFile(internalFileName, contextMap);
         alreadyPublishedFiles.remove(internalFileName);
         currentNumberOfTasks.decrementAndGet();
@@ -223,6 +229,10 @@
         }
     }
 
+    PublishedChecker createPublishedChecker() {
+        return new PublishedChecker(applicationConfiguration);
+    }
+
     int getCurrentNumberOfTasks() {
         return currentNumberOfTasks.get();
     }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
similarity index 100%
rename from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
rename to datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index acae1e6..b67fac2 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -28,56 +28,49 @@
 
 
 class CloudConfigParserTest {
-
-    private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
-            //@formatter:on
-            new ImmutableDmaapConsumerConfiguration.Builder()
-                    .timeoutMs(-1)
-                    .dmaapHostName("message-router.onap.svc.cluster.local")
-                    .dmaapUserName("admin")
-                    .dmaapUserPassword("admin")
-                    .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
-                    .dmaapPortNumber(2222)
-                    .dmaapContentType("application/json")
-                    .messageLimit(-1)
-                    .dmaapProtocol("http")
-                    .consumerId("C12")
-                    .consumerGroup("OpenDCAE-c12")
-                    .trustStorePath("trustStorePath")
-                    .trustStorePasswordPath("trustStorePasswordPath")
-                    .keyStorePath("keyStorePath")
-                    .keyStorePasswordPath("keyStorePasswordPath")
-                    .enableDmaapCertAuth(true)
+    private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+            new ImmutableDmaapConsumerConfiguration.Builder() //
+                    .timeoutMs(-1) //
+                    .dmaapHostName("message-router.onap.svc.cluster.local") //
+                    .dmaapUserName("admin") //
+                    .dmaapUserPassword("admin") //
+                    .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+                    .dmaapPortNumber(2222) //
+                    .dmaapContentType("application/json") //
+                    .messageLimit(-1) //
+                    .dmaapProtocol("http") //
+                    .consumerId("C12") //
+                    .consumerGroup("OpenDCAE-c12") //
+                    .trustStorePath("trustStorePath") //
+                    .trustStorePasswordPath("trustStorePasswordPath") //
+                    .keyStorePath("keyStorePath") //
+                    .keyStorePasswordPath("keyStorePasswordPath") //
+                    .enableDmaapCertAuth(true) //
                     .build();
-            //@formatter:off
 
-    private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
-            //@formatter:on
-            new ImmutableDmaapPublisherConfiguration.Builder()
-                    .dmaapTopicName("publish")
-                    .dmaapUserPassword("dradmin")
-                    .dmaapPortNumber(3907)
-                    .dmaapProtocol("https")
-                    .dmaapContentType("application/json")
-                    .dmaapHostName("message-router.onap.svc.cluster.local")
-                    .dmaapUserName("dradmin")
-                    .trustStorePath("trustStorePath")
-                    .trustStorePasswordPath("trustStorePasswordPath")
-                    .keyStorePath("keyStorePath")
-                    .keyStorePasswordPath("keyStorePasswordPath")
-                    .enableDmaapCertAuth(true)
+    private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+            new ImmutableDmaapPublisherConfiguration.Builder() //
+                    .dmaapTopicName("publish") //
+                    .dmaapUserPassword("dradmin") //
+                    .dmaapPortNumber(3907) //
+                    .dmaapProtocol("https") //
+                    .dmaapContentType("application/json") //
+                    .dmaapHostName("message-router.onap.svc.cluster.local") //
+                    .dmaapUserName("dradmin") //
+                    .trustStorePath("trustStorePath") //
+                    .trustStorePasswordPath("trustStorePasswordPath") //
+                    .keyStorePath("keyStorePath") //
+                    .keyStorePasswordPath("keyStorePasswordPath") //
+                    .enableDmaapCertAuth(true) //
                     .build();
-            //@formatter:off
 
-    private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
-            //@formatter:on
-            new ImmutableFtpesConfig.Builder()
-                    .keyCert("/config/ftpKey.jks")
-                    .keyPassword("secret")
-                    .trustedCA("config/cacerts")
-                    .trustedCAPassword("secret")
+    private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+            new ImmutableFtpesConfig.Builder() //
+                    .keyCert("/config/ftpKey.jks") //
+                    .keyPassword("secret") //
+                    .trustedCA("config/cacerts") //
+                    .trustedCAPassword("secret") //
                     .build();
-            //@formatter:off
 
     private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
 
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index f7b8329..b33180f 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -68,48 +68,47 @@
 
     @Test
     void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
 
-        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
                 .build();
-        FileData expectedFileData = ImmutableFileData.builder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .scheme(Scheme.FTPS)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        FileData expectedFileData = ImmutableFileData.builder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .scheme(Scheme.FTPS) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
         List<FileData> files = new ArrayList<>();
         files.add(expectedFileData);
         FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
-                .pnfName(SOURCE_NAME)
-                .messageMetaData(messageMetaData)
-                .files(files)
+                .pnfName(SOURCE_NAME) //
+                .messageMetaData(messageMetaData) //
+                .files(files) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -123,48 +122,47 @@
 
     @Test
     void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
 
-        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
                 .build();
-        FileData expectedFileData = ImmutableFileData.builder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .scheme(Scheme.FTPS)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        FileData expectedFileData = ImmutableFileData.builder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .scheme(Scheme.FTPS) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
         List<FileData> files = new ArrayList<>();
         files.add(expectedFileData);
-        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
-                .pnfName(SOURCE_NAME)
-                .messageMetaData(messageMetaData)
-                .files(files)
+        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+                .pnfName(SOURCE_NAME) //
+                .messageMetaData(messageMetaData) //
+                .files(files) //
                 .build();
-        // @formatter:on
+
         String parsedString = message.getParsed();
         String messageString = "[" + parsedString + "," + parsedString + "]";
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -178,21 +176,20 @@
 
     @Test
     void whenPassingCorrectJsonWithoutLocation_noMessage() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -206,48 +203,47 @@
 
     @Test
     void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
 
-        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
                 .build();
-        FileData expectedFileData = ImmutableFileData.builder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .scheme(Scheme.FTPS)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        FileData expectedFileData = ImmutableFileData.builder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .scheme(Scheme.FTPS) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
         List<FileData> files = new ArrayList<>();
         files.add(expectedFileData);
-        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
-                .pnfName(SOURCE_NAME)
-                .messageMetaData(messageMetaData)
-                .files(files)
+        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+                .pnfName(SOURCE_NAME) //
+                .messageMetaData(messageMetaData) //
+                .files(files) //
                 .build();
-        // @formatter:on
+
         String parsedString = message.getParsed();
         String messageString = "[{\"event\":{}}," + parsedString + "]";
         JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
@@ -258,21 +254,20 @@
 
     @Test
     void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName("Faulty event name")
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName("Faulty event name") //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -286,21 +281,20 @@
 
     @Test
     void whenPassingCorrectJsonWithoutName_noFileData() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -314,14 +308,13 @@
 
     @Test
     void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
-        // @formatter:off
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -335,21 +328,20 @@
 
     @Test
     void whenPassingCorrectJsonWithoutCompression_noFileData() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -363,21 +355,20 @@
 
     @Test
     void whenPassingCorrectJsonWithoutFileFormatType_noFileData() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -391,55 +382,54 @@
 
     @Test
     void whenPassingOneCorrectJsonWithoutFileFormatVersionAndOneCorrect_oneFileData() {
-        // @formatter:off
-        AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
+        AdditionalField additionalFaultyField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
                 .build();
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalFaultyField)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalFaultyField) //
+                .addAdditionalField(additionalField) //
                 .build();
 
-        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
                 .build();
-        FileData expectedFileData = ImmutableFileData.builder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .scheme(Scheme.FTPS)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        FileData expectedFileData = ImmutableFileData.builder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .scheme(Scheme.FTPS) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
         List<FileData> files = new ArrayList<>();
         files.add(expectedFileData);
-        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
-                .pnfName(SOURCE_NAME)
-                .messageMetaData(messageMetaData)
-                .files(files)
+        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder() //
+                .pnfName(SOURCE_NAME) //
+                .messageMetaData(messageMetaData) //
+                .files(files) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -453,14 +443,13 @@
 
     @Test
     void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
-        // @formatter:off
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier("PM_MEAS_FILES_INVALID")
-                .changeType("FileReady_INVALID")
-                .notificationFieldsVersion("1.0_INVALID")
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier("PM_MEAS_FILES_INVALID") //
+                .changeType("FileReady_INVALID") //
+                .notificationFieldsVersion("1.0_INVALID") //
                 .build();
-        // @formatter:on
+
         String incorrectMessageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -486,21 +475,20 @@
 
     @Test
     void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
                 .compression(GZIP_COMPRESSION)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(INCORRECT_CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(CHANGE_IDENTIFIER) //
+                .changeType(INCORRECT_CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
@@ -514,21 +502,20 @@
 
     @Test
     void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .location(LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .name(PM_FILE_NAME) //
+                .location(LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
+        JsonMessage message = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(INCORRECT_CHANGE_IDENTIFIER) //
+                .changeType(CHANGE_TYPE) //
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION) //
+                .addAdditionalField(additionalField) //
                 .build();
-        // @formatter:on
+
         String messageString = message.toString();
         String parsedString = message.getParsed();
         JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f88e301..a695e20 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -28,6 +28,8 @@
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -74,7 +76,7 @@
     private static final String PORT_22 = "22";
     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
     private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
-    private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
+    private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
     private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
     private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
     private static final String GZIP_COMPRESSION = "gzip";
@@ -86,7 +88,7 @@
     private static AppConfig appConfig;
     private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
     private DMaaPMessageConsumerTask messageConsumerTask;
-    private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+    private DMaaPConsumerReactiveHttpClient httpClientMock;
 
     private static String ftpesMessageString;
     private static FileData ftpesFileData;
@@ -96,156 +98,163 @@
     private static FileData sftpFileData;
     private static FileReadyMessage expectedSftpMessage;
 
+    /**
+     * Sets up data for the test.
+     */
     @BeforeAll
     public static void setUp() {
-        //@formatter:off
-        dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
-                .consumerGroup("OpenDCAE-c12")
-                .consumerId("c12")
-                .dmaapContentType("application/json")
-                .dmaapHostName("54.45.33.2")
-                .dmaapPortNumber(1234).dmaapProtocol("https")
-                .dmaapUserName("Datafile")
-                .dmaapUserPassword("Datafile")
-                .dmaapTopicName("unauthenticated.NOTIFICATION")
-                .timeoutMs(-1)
-                .messageLimit(-1)
-                .trustStorePath("trustStorePath")
-                .trustStorePasswordPath("trustStorePasswordPath")
-                .keyStorePath("keyStorePath")
-                .keyStorePasswordPath("keyStorePasswordPath")
-                .enableDmaapCertAuth(true)
+        dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() //
+                .consumerGroup("OpenDCAE-c12") //
+                .consumerId("c12") //
+                .dmaapContentType("application/json") //
+                .dmaapHostName("54.45.33.2") //
+                .dmaapPortNumber(1234).dmaapProtocol("https") //
+                .dmaapUserName("Datafile") //
+                .dmaapUserPassword("Datafile") //
+                .dmaapTopicName("unauthenticated.NOTIFICATION") //
+                .timeoutMs(-1) //
+                .messageLimit(-1) //
+                .trustStorePath("trustStorePath") //
+                .trustStorePasswordPath("trustStorePasswordPath") //
+                .keyStorePath("keyStorePath") //
+                .keyStorePasswordPath("keyStorePasswordPath") //
+                .enableDmaapCertAuth(true) //
                 .build();
 
         appConfig = mock(AppConfig.class);
 
-        AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
-                .location(FTPES_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .location(FTPES_LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
 
-        JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
-                .changeType(FILE_READY_CHANGE_TYPE)
-                .notificationFieldsVersion("1.0")
-                .addAdditionalField(ftpesAdditionalField)
+        JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+                .changeType(FILE_READY_CHANGE_TYPE) //
+                .notificationFieldsVersion("1.0") //
+                .addAdditionalField(ftpesAdditionalField) //
                 .build();
 
         ftpesMessageString = ftpesJsonMessage.toString();
-        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
-                .changeType(FILE_READY_CHANGE_TYPE)
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+                .changeType(FILE_READY_CHANGE_TYPE) //
                 .build();
-        ftpesFileData = ImmutableFileData.builder()
-                .name(PM_FILE_NAME)
-                .location(FTPES_LOCATION)
-                .scheme(Scheme.FTPS)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        ftpesFileData = ImmutableFileData.builder() //
+                .name(PM_FILE_NAME) //
+                .location(FTPES_LOCATION) //
+                .scheme(Scheme.FTPS) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
 
         List<FileData> files = new ArrayList<>();
         files.add(ftpesFileData);
-        expectedFtpesMessage = ImmutableFileReadyMessage.builder()
-                .pnfName(SOURCE_NAME)
-                .messageMetaData(messageMetaData)
-                .files(files)
+        expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
+                .pnfName(SOURCE_NAME) //
+                .messageMetaData(messageMetaData) //
+                .files(files) //
                 .build();
 
-        AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
-                .location(SFTP_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+                .location(SFTP_LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
-                .changeType(FILE_READY_CHANGE_TYPE)
-                .notificationFieldsVersion("1.0")
-                .addAdditionalField(sftpAdditionalField)
+        JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+                .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+                .changeType(FILE_READY_CHANGE_TYPE) //
+                .notificationFieldsVersion("1.0") //
+                .addAdditionalField(sftpAdditionalField) //
                 .build();
         sftpMessageString = sftpJsonMessage.toString();
-        sftpFileData = ImmutableFileData.builder()
-                .name(PM_FILE_NAME)
-                .location(SFTP_LOCATION)
-                .scheme(Scheme.FTPS)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        sftpFileData = ImmutableFileData.builder() //
+                .name(PM_FILE_NAME) //
+                .location(SFTP_LOCATION) //
+                .scheme(Scheme.FTPS) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
 
-        ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .name(PM_FILE_NAME)
-                .location(FTPES_LOCATION)
-                .internalLocation(LOCAL_FILE_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .name(PM_FILE_NAME) //
+                .location(FTPES_LOCATION) //
+                .internalLocation(LOCAL_FILE_LOCATION) //
+                .compression(GZIP_COMPRESSION) //
+                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
         listOfConsumerDmaapModel.add(consumerDmaapModel);
 
         files = new ArrayList<>();
         files.add(sftpFileData);
-        expectedSftpMessage = ImmutableFileReadyMessage.builder()
-                .pnfName(SOURCE_NAME)
-                .messageMetaData(messageMetaData)
-                .files(files)
+        expectedSftpMessage = ImmutableFileReadyMessage.builder() //
+                .pnfName(SOURCE_NAME) //
+                .messageMetaData(messageMetaData) //
+                .files(files) //
                 .build();
-        //@formatter:on
     }
 
     @Test
     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
         prepareMocksForDmaapConsumer("", null);
 
-        StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
-                .expectError(DatafileTaskException.class).verify();
+        StepVerifier.create(messageConsumerTask.execute()) //
+                .expectSubscription() //
+                .expectError(DatafileTaskException.class) //
+                .verify();
 
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+        verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
     }
 
     @Test
     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
         prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
 
-        StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
+        StepVerifier.create(messageConsumerTask.execute()) //
+                .expectNext(expectedFtpesMessage) //
+                .verifyComplete();
 
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
-        verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+        verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+        verifyNoMoreInteractions(httpClientMock);
     }
 
     @Test
     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
         prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
 
-        StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
+        StepVerifier.create(messageConsumerTask.execute()) //
+                .expectNext(expectedSftpMessage) //
+                .verifyComplete();
 
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
-        verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+        verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+        verifyNoMoreInteractions(httpClientMock);
     }
 
     private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
         Mono<String> messageAsMono = Mono.just(message);
         JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
-        dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
-        when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+        httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
+        when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
 
         if (!message.isEmpty()) {
             when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
@@ -255,9 +264,8 @@
                     .thenReturn(Flux.error(new DatafileTaskException("problemas")));
         }
 
-        messageConsumerTask =
-                spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+        messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
         when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
-        doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
+        doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
     }
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index d612d17..ed8b93f 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
@@ -16,8 +16,9 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -25,19 +26,36 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
 import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
 import reactor.test.StepVerifier;
 
 /**
@@ -52,31 +70,41 @@
     private static final String START_EPOCH_MICROSEC = "8745745764578";
     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+    private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+    private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + PM_FILE_NAME;
+
+    private static final String COMPRESSION = "gzip";
+    private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
+    private static final String FILE_FORMAT_VERSION = "V10";
+    private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+
+    private static final String HOST = "54.45.33.2";
+    private static final String HTTPS_SCHEME = "https";
+    private static final int PORT = 1234;
+    private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
+    private static final String PUBLISH_TOPIC = "publish";
+    private static final String FEED_ID = "1";
+    private static final String FILE_CONTENT = "Just a string.";
+
+    private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
 
     private static ConsumerDmaapModel consumerDmaapModel;
-    private static DataRouterPublisher dmaapPublisherTask;
-    private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
+    private static DmaapProducerReactiveHttpClient httpClientMock;
     private static AppConfig appConfig;
-    private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
+    private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
 
+    private static DataRouterPublisher publisherTaskUnderTestSpy;
+
+    /**
+     * Sets up data for tests.
+     */
     @BeforeAll
     public static void setUp() {
+        when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+        when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+        when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
 
-        dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder()
-                .dmaapContentType("application/json") //
-                .dmaapHostName("54.45.33.2") //
-                .dmaapPortNumber(1234) //
-                .dmaapProtocol("https") //
-                .dmaapUserName("DFC") //
-                .dmaapUserPassword("DFC") //
-                .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
-                .trustStorePath("trustStorePath") //
-                .trustStorePasswordPath("trustStorePasswordPath") //
-                .keyStorePath("keyStorePath") //
-                .keyStorePasswordPath("keyStorePasswordPath") //
-                .enableDmaapCertAuth(true) //
-                .build(); //
-        consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
+        consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
                 .productName(PRODUCT_NAME) //
                 .vendorName(VENDOR_NAME) //
                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -84,61 +112,145 @@
                 .startEpochMicrosec(START_EPOCH_MICROSEC) //
                 .timeZoneOffset(TIME_ZONE_OFFSET) //
                 .name(PM_FILE_NAME) //
-                .location("ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME) //
-                .internalLocation("target/" + PM_FILE_NAME) //
+                .location(FTPES_ADDRESS) //
+                .internalLocation(Paths.get("target/" + LOCAL_FILE_NAME)) //
                 .compression("gzip") //
-                .fileFormatType("org.3GPP.32.435#measCollec") //
-                .fileFormatVersion("V10") //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build(); //
         appConfig = mock(AppConfig.class);
-
-        doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+        publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
     }
 
     @Test
-    public void whenPassedObjectFits_ReturnsCorrectStatus() {
-        prepareMocksForTests(Mono.just(HttpStatus.OK));
+    public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
+        prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
 
-        Map<String, String> contextMap = new HashMap<>();
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
-                .expectNext(consumerDmaapModel).verifyComplete();
+        StepVerifier
+                .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+                .expectNext(consumerDmaapModel) //
+                .verifyComplete();
 
-        verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any(), eq(contextMap));
-        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+        ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+        verify(httpClientMock).getBaseUri();
+        verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+        verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
+        verifyNoMoreInteractions(httpClientMock);
+
+        HttpPut actualPut = (HttpPut) requestCaptor.getValue();
+        URI actualUri = actualPut.getURI();
+        assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+        assertEquals(HOST, actualUri.getHost());
+        assertEquals(PORT, actualUri.getPort());
+        Path actualPath = Paths.get(actualUri.getPath());
+        assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
+        assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+        assertTrue(LOCAL_FILE_NAME.equals(actualPath.getName(2).toString()));
+
+        Header[] contentHeaders = actualPut.getHeaders("content-type");
+        assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
+
+        Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
+        Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
+        assertTrue(10 == metaHash.size());
+        assertEquals(PRODUCT_NAME, metaHash.get("productName"));
+        assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
+        assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
+        assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
+        assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
+        assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
+        assertEquals(COMPRESSION, metaHash.get("compression"));
+        assertEquals(FTPES_ADDRESS, metaHash.get("location"));
+        assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
+        assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
     }
 
     @Test
-    public void whenPassedObjectFits_firstFailsThenSucceeds() {
-        prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
+    void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
+        prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
 
-        Map<String, String> contextMap = new HashMap<>();
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
-                .expectNext(consumerDmaapModel).verifyComplete();
-
-        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
-        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+        StepVerifier
+                .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 2, Duration.ofSeconds(0), CONTEXT_MAP))
+                .expectNext(consumerDmaapModel) //
+                .verifyComplete();
     }
 
     @Test
-    public void whenPassedObjectFits_firstFailsThenFails() {
-        prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
+    public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
+        prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+                Integer.valueOf(HttpStatus.OK.value()));
 
-        Map<String, String> contextMap = new HashMap<>();
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), contextMap))
-                .expectErrorMessage("Retries exhausted: 1/1").verify();
+        StepVerifier
+                .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+                .expectNext(consumerDmaapModel) //
+                .verifyComplete();
 
-        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any(), eq(contextMap));
-        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+        verify(httpClientMock, times(2)).getBaseUri();
+        verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+        verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+        verifyNoMoreInteractions(httpClientMock);
+    }
+
+    @Test
+    public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
+        prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
+                Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
+
+        StepVerifier
+                .create(publisherTaskUnderTestSpy.execute(consumerDmaapModel, 1, Duration.ofSeconds(0), CONTEXT_MAP))
+                .expectErrorMessage("Retries exhausted: 1/1") //
+                .verify();
+
+        verify(httpClientMock, times(2)).getBaseUri();
+        verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
+        verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
+        verifyNoMoreInteractions(httpClientMock);
     }
 
     @SafeVarargs
-    final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
-        dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
-        when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any(), any())).thenReturn(firstResponse,
-                nextHttpResponses);
+    final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
+            throws Exception {
+        httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+        when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+        doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
+        doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
 
-        dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
-        when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
-        doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+        UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+        when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+        HttpResponse httpResponseMock = mock(HttpResponse.class);
+        if (exception == null) {
+            when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+                    .thenReturn(httpResponseMock);
+        } else {
+            when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
+                    .thenThrow(exception).thenReturn(httpResponseMock);
+        }
+        StatusLine statusLineMock = mock(StatusLine.class);
+        when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+        when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
+
+        InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
+        doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", LOCAL_FILE_NAME));
+    }
+
+    private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
+        Map<String, String> metaHash = new HashMap<>();
+        String actualMetaData = metaHeaders[0].getValue();
+        actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
+        actualMetaData = actualMetaData.replace("\"", "");
+        String[] commaSplitedMetaData = actualMetaData.split(",");
+        for (int i = 0; i < commaSplitedMetaData.length; i++) {
+            String[] keyValuePair = commaSplitedMetaData[i].split(":");
+            if (keyValuePair.length > 2) {
+                List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
+                for (int j = 1; j < keyValuePair.length; j++) {
+                    arrayKeyValuePair.add(keyValuePair[j]);
+                }
+                keyValuePair[1] = String.join(":", arrayKeyValuePair);
+            }
+            metaHash.put(keyValuePair[0], keyValuePair[1]);
+        }
+        return metaHash;
     }
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index c266d50..fb49c86 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -1,4 +1,4 @@
-/*
+/*-
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
@@ -134,7 +134,7 @@
             .timeZoneOffset(TIME_ZONE_OFFSET)
             .name(PM_FILE_NAME)
             .location(location)
-            .internalLocation(LOCAL_FILE_LOCATION.toString())
+            .internalLocation(LOCAL_FILE_LOCATION)
             .compression(GZIP_COMPRESSION)
             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
             .fileFormatVersion(FILE_FORMAT_VERSION)
@@ -161,7 +161,8 @@
         ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
 
         Map<String, String> contextMap = new HashMap<>();
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+        StepVerifier.create(
+                collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
                 .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
         verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -179,7 +180,9 @@
         ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION_NO_PORT);
 
         Map<String, String> contextMap = new HashMap<>();
-         StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+        StepVerifier
+                .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+                        contextMap))
                 .expectNext(expectedConsumerDmaapModel) //
                 .verifyComplete();
 
@@ -187,7 +190,9 @@
         fileData = createFileData(SFTP_LOCATION, Scheme.SFTP);
         expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(SFTP_LOCATION);
 
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+        StepVerifier
+                .create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0),
+                        contextMap))
                 .expectNext(expectedConsumerDmaapModel) //
                 .verifyComplete();
 
@@ -206,7 +211,8 @@
                 .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
         Map<String, String> contextMap = new HashMap<>();
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+        StepVerifier.create(
+                collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
                 .expectErrorMessage("Retries exhausted: 3/3").verify();
 
         verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -223,10 +229,10 @@
 
         FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS);
         Map<String, String> contextMap = new HashMap<>();
-        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
+        StepVerifier.create(
+                collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0), contextMap))
                 .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
         verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
     }
-
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
new file mode 100644
index 0000000..3e3c2ed
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -0,0 +1,175 @@
+/*-
+* ============LICENSE_START=======================================================
+*  Copyright (C) 2019 Nordix Foundation.
+* ================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+* SPDX-License-Identifier: Apache-2.0
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.springframework.web.util.DefaultUriBuilderFactory;
+import org.springframework.web.util.UriBuilder;
+
+/**
+ * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
+ *
+ */
+public class PublishedCheckerTest {
+    private static final String EMPTY_CONTENT = "[]";
+    private static final String FEEDLOG_TOPIC = "feedlog";
+    private static final String FEED_ID = "1";
+    private static final String HTTPS_SCHEME = "https";
+    private static final String HOST = "54.45.33.2";
+    private static final int PORT = 1234;
+    private static final String SOURCE_NAME = "oteNB5309";
+    private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
+    private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+
+    private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+    private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+    private static AppConfig appConfigMock;
+    private DmaapProducerReactiveHttpClient httpClientMock = mock(DmaapProducerReactiveHttpClient.class);
+
+    private PublishedChecker publishedCheckerUnderTestSpy;
+
+    /**
+     * Sets up data for the tests.
+     */
+    @BeforeAll
+    public static void setUp() {
+        when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
+        when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
+        when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+
+        appConfigMock = mock(AppConfig.class);
+        when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+    }
+
+    @Test
+    public void executeWhenNotPublished_returnsFalse() throws Exception {
+        prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
+
+        boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+        assertFalse(isPublished);
+
+        ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
+        verify(httpClientMock).getBaseUri();
+        verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
+        verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), anyInt(), any());
+        verifyNoMoreInteractions(httpClientMock);
+
+        HttpUriRequest getRequest = requestCaptor.getValue();
+        assertTrue(getRequest instanceof HttpGet);
+        URI actualUri = getRequest.getURI();
+        assertEquals(HTTPS_SCHEME, actualUri.getScheme());
+        assertEquals(HOST, actualUri.getHost());
+        assertEquals(PORT, actualUri.getPort());
+        Path actualPath = Paths.get(actualUri.getPath());
+        assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
+        assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
+        String actualQuery = actualUri.getQuery();
+        assertTrue(actualQuery.contains("type=pub"));
+        assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+    }
+
+    @Test
+    public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
+        prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
+
+        boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+        assertFalse(isPublished);
+    }
+
+    @Test
+    public void executeWhenPublished_returnsTrue() throws Exception {
+        prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
+
+        boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+        assertTrue(isPublished);
+    }
+
+    @Test
+    public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
+        prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
+
+        boolean isPublished = publishedCheckerUnderTestSpy.execute(LOCAL_FILE_NAME, CONTEXT_MAP);
+
+        assertFalse(isPublished);
+    }
+
+    final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
+        publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
+
+        doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
+        doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
+
+        UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
+        when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+
+        HttpResponse httpResponseMock = mock(HttpResponse.class);
+        if (exception == null) {
+            when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+                    .thenReturn(httpResponseMock);
+        } else {
+            when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), anyInt(), any()))
+                    .thenThrow(exception);
+        }
+        HttpEntity httpEntityMock = mock(HttpEntity.class);
+        StatusLine statusLineMock = mock(StatusLine.class);
+        when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
+        when(statusLineMock.getStatusCode()).thenReturn(responseCode);
+        when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
+        InputStream stream = new ByteArrayInputStream(content.getBytes());
+        when(httpEntityMock.getContent()).thenReturn(stream);
+    }
+}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 8c4b389..d781cea 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -19,6 +19,7 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -26,9 +27,12 @@
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
@@ -43,6 +47,7 @@
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -56,6 +61,7 @@
 
     private int uniqueValue = 0;
     private DMaaPMessageConsumerTask consumerMock;
+    private PublishedChecker publishedCheckerMock;
     private FileCollector fileCollectorMock;
     private DataRouterPublisher dataRouterMock;
 
@@ -75,13 +81,15 @@
                 .keyStorePasswordPath("keyStorePasswordPath") //
                 .enableDmaapCertAuth(true) //
                 .build(); //
+        doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
 
         consumerMock = mock(DMaaPMessageConsumerTask.class);
+        publishedCheckerMock = mock(PublishedChecker.class);
         fileCollectorMock = mock(FileCollector.class);
         dataRouterMock = mock(DataRouterPublisher.class);
 
-        doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
         doReturn(consumerMock).when(testedObject).createConsumerTask();
+        doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
         doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
     }
@@ -146,7 +154,7 @@
                 .timeZoneOffset("") //
                 .name("") //
                 .location("") //
-                .internalLocation("internalLocation") //
+                .internalLocation(Paths.get("internalLocation")) //
                 .compression("") //
                 .fileFormatType("") //
                 .fileFormatVersion("") //
@@ -174,6 +182,8 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
         doReturn(fileReadyMessages).when(consumerMock).execute();
 
+        doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
@@ -197,6 +207,8 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
         doReturn(fileReadyMessages).when(consumerMock).execute();
 
+        doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
         Mono<Object> error = Mono.error(new Exception("problem"));
 
@@ -228,6 +240,8 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
         doReturn(fileReadyMessages).when(consumerMock).execute();
 
+        doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
 
@@ -260,6 +274,8 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
         doReturn(fileReadyMessages).when(consumerMock).execute();
 
+        doReturn(false).when(publishedCheckerMock).execute(anyString(), any());
+
         Mono<ConsumerDmaapModel> collectedFile = Mono.just(consumerData());
         doReturn(collectedFile).when(fileCollectorMock).execute(notNull(), notNull(), anyLong(), notNull(), any());
         doReturn(collectedFile).when(dataRouterMock).execute(notNull(), anyLong(), notNull(), any());
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 0eaa7a1..5e08efc 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -30,6 +30,6 @@
     }
 
     public DatafileTaskException(String message, Exception e) {
-        super(message + e);
+        super(message, e);
     }
 }
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 9f3a318..f115dba 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -20,15 +20,44 @@
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
 
+import java.lang.reflect.Type;
+import java.nio.file.Path;
 
+/**
+ * Helper class to serialize object.
+ */
 public class CommonFunctions {
 
-    private static Gson gson = new GsonBuilder().serializeNulls().create();
+    private static Gson gson =
+        new GsonBuilder().registerTypeHierarchyAdapter(Path.class, new PathConverter()).serializeNulls().create();
 
-    private CommonFunctions() {}
+    private CommonFunctions() {
+    }
 
+    /**
+     * Serializes a <code>ConsumerDmaapModel</code>.
+     *
+     * @param consumerDmaapModel model to serialize.
+     *
+     * @return a string with the serialized model.
+     */
     public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
         return gson.toJson(consumerDmaapModel);
     }
-}
\ No newline at end of file
+
+    /**
+     * Json serializer that handles Path serializations, since <code>Path</code> does not implement the
+     * <code>Serializable</code> interface.
+     */
+    public static class PathConverter implements JsonSerializer<Path> {
+        @Override
+        public JsonElement serialize(Path path, Type type, JsonSerializationContext jsonSerializationContext) {
+            return new JsonPrimitive(path.toString());
+        }
+    }
+}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 972316b..2337485 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -17,6 +17,9 @@
 package org.onap.dcaegen2.collectors.datafile.model;
 
 import com.google.gson.annotations.SerializedName;
+
+import java.nio.file.Path;
+
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
@@ -55,7 +58,7 @@
     String getLocation();
 
     @SerializedName("internalLocation")
-    String getInternalLocation();
+    Path getInternalLocation();
 
     @SerializedName("compression")
     String getCompression();
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index cbc3e12..25f0dbf 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -1,58 +1,51 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.model;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Paths;
 
 import org.junit.jupiter.api.Test;
 
-class CommonFunctionsTest {
-    // @formatter:off
-    private ConsumerDmaapModel model = ImmutableConsumerDmaapModel.builder()
-            .productName("NrRadio")
-            .vendorName("Ericsson")
-            .lastEpochMicrosec("8745745764578")
-            .sourceName("oteNB5309")
-            .startEpochMicrosec("8745745764578")
-            .timeZoneOffset("UTC+05:00")
-            .name("A20161224.1030-1045.bin.gz")
-            .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
-            .internalLocation("target/A20161224.1030-1045.bin.gz")
-            .compression("gzip")
-            .fileFormatType("org.3GPP.32.435#measCollec")
-            .fileFormatVersion("V10")
-            .build();
-
-    private static final String EXPECTED_RESULT =
-             "{\"productName\":\"NrRadio\","
-            + "\"vendorName\":\"Ericsson\","
-            + "\"lastEpochMicrosec\":\"8745745764578\","
-            + "\"sourceName\":\"oteNB5309\","
-            + "\"startEpochMicrosec\":\"8745745764578\","
-            + "\"timeZoneOffset\":\"UTC+05:00\","
-            + "\"name\":\"A20161224.1030-1045.bin.gz\","
-            + "\"location\":\"ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz\","
-            + "\"internalLocation\":\"target/A20161224.1030-1045.bin.gz\","
-            + "\"compression\":\"gzip\","
-            + "\"fileFormatType\":\"org.3GPP.32.435#measCollec\","
-            + "\"fileFormatVersion\":\"V10\"}";
-    // @formatter:on
+public class CommonFunctionsTest {
     @Test
-    void createJsonBody_shouldReturnJsonInString() {
-        assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
+    public void createJsonBody_success() {
+        ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel //
+        .builder() //
+        .productName("") //
+        .vendorName("") //
+        .lastEpochMicrosec("") //
+        .sourceName("") //
+        .startEpochMicrosec("") //
+        .timeZoneOffset("") //
+        .name("") //
+        .location("") //
+        .internalLocation(Paths.get("internalLocation")) //
+        .compression("") //
+        .fileFormatType("") //
+        .fileFormatVersion("") //
+        .build();
+        String actualBody = CommonFunctions.createJsonBody(consumerDmaapModel);
+
+        assertTrue(actualBody.contains("\"internalLocation\":\"internalLocation\""));
     }
-}
\ No newline at end of file
+}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 2c5e701..0c1ac43 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -16,6 +16,9 @@
 
 package org.onap.dcaegen2.collectors.datafile.model;
 
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -28,29 +31,27 @@
     private static final String TIME_ZONE_OFFSET = "UTC+05:00";
     private static final String NAME = "A20161224.1030-1045.bin.gz";
     private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
-    private static final String INTERNAL_LOCATION = "target/A20161224.1030-1045.bin.gz";
+    private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
     private static final String COMPRESSION = "gzip";
     private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
     private static final String FILE_FORMAT_VERSION = "V10";
 
     @Test
     public void consumerDmaapModelBuilder_shouldBuildAnObject() {
-        // @formatter:off
-        ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .name(NAME)
-                .location(LOCATION)
-                .internalLocation(INTERNAL_LOCATION)
-                .compression(COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
+        ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+                .productName(PRODUCT_NAME) //
+                .vendorName(VENDOR_NAME) //
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+                .sourceName(SOURCE_NAME) //
+                .startEpochMicrosec(START_EPOCH_MICROSEC) //
+                .timeZoneOffset(TIME_ZONE_OFFSET) //
+                .name(NAME) //
+                .location(LOCATION) //
+                .internalLocation(INTERNAL_LOCATION) //
+                .compression(COMPRESSION) //
+                .fileFormatType(FILE_FORMAT_TYPE) //
+                .fileFormatVersion(FILE_FORMAT_VERSION) //
                 .build();
-        // @formatter:on
 
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(PRODUCT_NAME, consumerDmaapModel.getProductName());
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 1bf3ec5..4283deb 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -67,7 +67,7 @@
         } catch (DatafileTaskException e) {
             throw e;
         } catch (Exception e) {
-            throw new DatafileTaskException("Could not open connection: ", e);
+            throw new DatafileTaskException("Could not open connection: " + e, e);
         }
     }
 
@@ -100,7 +100,7 @@
                 throw new DatafileTaskException("Could not retrieve file " + remoteFileName);
             }
         } catch (IOException e) {
-            throw new DatafileTaskException("Could not fetch file: ", e);
+            throw new DatafileTaskException("Could not fetch file: " + e, e);
         }
         logger.trace("collectFile fetched: {}", localFileName);
     }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index 2f48916..dec8af4 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -79,7 +79,7 @@
                 sftpChannel = getChannel(session);
             }
         } catch (JSchException e) {
-            throw new DatafileTaskException("Could not open Sftp client", e);
+            throw new DatafileTaskException("Could not open Sftp client" + e, e);
         }
     }
 
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
new file mode 100644
index 0000000..e01a941
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpAsyncClientBuilderWrapper.java
@@ -0,0 +1,67 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class HttpAsyncClientBuilderWrapper implements IHttpAsyncClientBuilder {
+    HttpAsyncClientBuilder builder = HttpAsyncClients.custom();
+
+    @Override
+    public IHttpAsyncClientBuilder setRedirectStrategy(RedirectStrategy redirectStrategy) {
+        builder.setRedirectStrategy(redirectStrategy);
+        return this;
+    }
+
+    @Override
+    public IHttpAsyncClientBuilder setSSLContext(SSLContext sslcontext) {
+        builder.setSSLContext(sslcontext);
+        return this;
+    }
+
+    @Override
+    public IHttpAsyncClientBuilder setSSLHostnameVerifier(HostnameVerifier hostnameVerifier) {
+        builder.setSSLHostnameVerifier(hostnameVerifier);
+        return this;
+    }
+
+    @Override
+    public IHttpAsyncClientBuilder setDefaultRequestConfig(RequestConfig config) {
+        builder.setDefaultRequestConfig(config);
+        return this;
+    }
+
+    @Override
+    public CloseableHttpAsyncClient build() {
+        return builder.build();
+    }
+
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
new file mode 100644
index 0000000..e0a51a8
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/http/IHttpAsyncClientBuilder.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.http;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
+import org.apache.http.client.RedirectStrategy;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public interface IHttpAsyncClientBuilder {
+    public IHttpAsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy);
+
+    public IHttpAsyncClientBuilder setSSLContext(final SSLContext sslcontext);
+
+    public IHttpAsyncClientBuilder setSSLHostnameVerifier(final HostnameVerifier hostnameVerifier);
+
+    public IHttpAsyncClientBuilder setDefaultRequestConfig(final RequestConfig config);
+
+    public CloseableHttpAsyncClient build();
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
deleted file mode 100644
index 5295b12..0000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-import org.springframework.core.io.FileSystemResource;
-
-public class FileSystemResourceWrapper implements IFileSystemResource {
-    private FileSystemResource realResource;
-
-    @Override
-    public void setPath(Path path) {
-        realResource = new FileSystemResource(path);
-    }
-    @Override
-    public InputStream getInputStream() throws IOException {
-        return realResource.getInputStream();
-    }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
deleted file mode 100644
index 23f14a3..0000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Path;
-
-public interface IFileSystemResource {
-
-    public void setPath(Path filePath);
-
-    public InputStream getInputStream() throws IOException;
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 9304688..9bd5d57 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -16,54 +16,34 @@
 
 package org.onap.dcaegen2.collectors.datafile.service.producer;
 
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.Future;
 
 import javax.net.ssl.SSLContext;
 
 import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
-import org.apache.http.impl.nio.client.HttpAsyncClients;
 import org.apache.http.ssl.SSLContextBuilder;
-import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.HttpAsyncClientBuilderWrapper;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
 import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
 import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.core.publisher.Mono;
+import org.springframework.web.util.UriBuilder;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -71,11 +51,7 @@
  */
 public class DmaapProducerReactiveHttpClient {
 
-    private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-    private static final String NAME_JSON_TAG = "name";
-    private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
-    private static final String URI_SEPARATOR = "/";
-    private static final String DEFAULT_FEED_ID = "1";
+    private static final int NO_REQUEST_TIMEOUT = -1;
     private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
     private static final Marker INVOKE_RETURN = MarkerFactory.getMarker("INVOKE_RETURN");
 
@@ -83,14 +59,10 @@
 
     private final String dmaapHostName;
     private final Integer dmaapPortNumber;
-    private final String dmaapTopicName;
     private final String dmaapProtocol;
-    private final String dmaapContentType;
     private final String user;
     private final String pwd;
 
-    private IFileSystemResource fileResource = new FileSystemResourceWrapper();
-
     /**
      * Constructor DmaapProducerReactiveHttpClient.
      *
@@ -99,96 +71,86 @@
     public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
         this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
-        this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
         this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
-        this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
         this.user = dmaapPublisherConfiguration.dmaapUserName();
         this.pwd = dmaapPublisherConfiguration.dmaapUserPassword();
     }
 
-    /**
-     * Function for calling DMaaP HTTP producer - post request to DMaaP DataRouter.
-     *
-     * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
-     * @return status code of operation
-     */
-    public Mono<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel,
-            Map<String, String> contextMap) {
-        MdcVariables.setMdcContextMap(contextMap);
-       try (CloseableHttpAsyncClient webClient = createWebClient()) {
+    public HttpResponse getDmaapProducerResponseWithRedirect(HttpUriRequest request, Map<String, String> contextMap)
+            throws DatafileTaskException {
+        try (CloseableHttpAsyncClient webClient = createWebClient(true, NO_REQUEST_TIMEOUT)) {
+            MdcVariables.setMdcContextMap(contextMap);
+            webClient.start();
 
-            HttpPut put = new HttpPut();
-            prepareHead(consumerDmaapModel, put);
-            prepareBody(consumerDmaapModel, put);
-            addUserCredentialsToHead(put);
-
-            logger.trace(INVOKE, "Starting to publish to DR {}", consumerDmaapModel.getInternalLocation());
-            Future<HttpResponse> future = webClient.execute(put, null);
+            logger.trace(INVOKE, "Starting to produce to DR {}", request);
+            Future<HttpResponse> future = webClient.execute(request, null);
             HttpResponse response = future.get();
-            logger.trace(INVOKE_RETURN, "Response from DR {}", response);
-            return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+            logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+            return response;
         } catch (Exception e) {
-            logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
-            return Mono.error(e);
+            throw new DatafileTaskException("Unable to create web client.", e);
         }
     }
 
-    private void addUserCredentialsToHead(HttpPut put) {
+    public HttpResponse getDmaapProducerResponseWithCustomTimeout(HttpUriRequest request, int requestTimeout,
+            Map<String, String> contextMap) throws DatafileTaskException {
+        try (CloseableHttpAsyncClient webClient = createWebClient(false, requestTimeout)) {
+            MdcVariables.setMdcContextMap(contextMap);
+            webClient.start();
+
+            logger.trace(INVOKE, "Starting to produce to DR {}", request);
+            Future<HttpResponse> future = webClient.execute(request, null);
+            HttpResponse response = future.get();
+            logger.trace(INVOKE_RETURN, "Response from DR {}", response.toString());
+            return response;
+        } catch (Exception e) {
+            throw new DatafileTaskException("Unable to create web client.", e);
+        }
+    }
+
+    public void addUserCredentialsToHead(HttpUriRequest request) {
         String plainCreds = user + ":" + pwd;
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
         byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
         String base64Creds = new String(base64CredsBytes);
         logger.trace("base64Creds...: {}", base64Creds);
-        put.addHeader("Authorization", "Basic " + base64Creds);
+        request.addHeader("Authorization", "Basic " + base64Creds);
     }
 
-    private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
-        put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
-        String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
-        metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
-        put.addHeader(X_DMAAP_DR_META, metaData.toString());
-        put.setURI(getUri(name));
-
-        String requestID = MDC.get(REQUEST_ID);
-        put.addHeader(X_ONAP_REQUEST_ID, requestID);
-        String invocationID = UUID.randomUUID().toString();
-        put.addHeader(X_INVOCATION_ID, invocationID);
+    public UriBuilder getBaseUri() {
+        return new DefaultUriBuilderFactory().builder() //
+                .scheme(dmaapProtocol) //
+                .host(dmaapHostName) //
+                .port(dmaapPortNumber);
     }
 
-    private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
-        Path fileLocation = Paths.get(model.getInternalLocation());
-        this.fileResource.setPath(fileLocation);
-        InputStream fileInputStream = fileResource.getInputStream();
-
-        put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
-
-    }
-
-    private URI getUri(String fileName) {
-        String path = dmaapTopicName + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + fileName;
-        return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
-                .path(path).build();
-    }
-
-    void setFileSystemResource(IFileSystemResource fileSystemResource) {
-        fileResource = fileSystemResource;
-    }
-
-    protected CloseableHttpAsyncClient createWebClient()
+    private CloseableHttpAsyncClient createWebClient(boolean expectRedirect, int requestTimeout)
             throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
+        SSLContext sslContext =
+                new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
 
-        SSLContext sslContext = new SSLContextBuilder() //
-                .loadTrustMaterial(null, (certificate, authType) -> true) //
-                .build();
+        IHttpAsyncClientBuilder clientBuilder = getHttpClientBuilder();
+        clientBuilder.setSSLContext(sslContext) //
+                .setSSLHostnameVerifier(new NoopHostnameVerifier());
 
-        CloseableHttpAsyncClient webClient =  HttpAsyncClients.custom() //
-                .setSSLContext(sslContext) //
-                .setSSLHostnameVerifier(new NoopHostnameVerifier()) //
-                .setRedirectStrategy(PublishRedirectStrategy.INSTANCE) //
-                .build();
-        webClient.start();
-        return webClient;
+        if (expectRedirect) {
+            clientBuilder.setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+        }
+
+        if (requestTimeout > NO_REQUEST_TIMEOUT) {
+            RequestConfig requestConfig = RequestConfig.custom() //
+                    .setSocketTimeout(requestTimeout) //
+                    .setConnectTimeout(requestTimeout) //
+                    .setConnectionRequestTimeout(requestTimeout) //
+                    .build();
+
+            clientBuilder.setDefaultRequestConfig(requestConfig);
+        }
+
+        return clientBuilder.build();
     }
 
+    IHttpAsyncClientBuilder getHttpClientBuilder() {
+        return new HttpAsyncClientBuilderWrapper();
+    }
 }
\ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index 06ff570..91c4c33 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -16,51 +16,44 @@
 
 package org.onap.dcaegen2.collectors.datafile.service.producer;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
 import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
+import org.apache.http.Header;
 import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
-import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.mockito.ArgumentCaptor;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.http.IHttpAsyncClientBuilder;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-
-import reactor.test.StepVerifier;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
@@ -68,32 +61,24 @@
  */
 class DmaapProducerReactiveHttpClientTest {
 
-    private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
-    private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
-    private static final String NAME_JSON_TAG = "name";
-    private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-
     private static final String HOST = "54.45.33.2";
     private static final String HTTPS_SCHEME = "https";
     private static final int PORT = 1234;
-    private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
-    private static final String URI_SEPARATOR = "/";
-    private static final String PUBLISH_TOPIC = "publish";
-    private static final String DEFAULT_FEED_ID = "1";
-    private static final String FILE_CONTENT = "Just a string.";
+    private static final String USER_NAME = "dradmin";
+    private static final int TWO_SECOND_TIMEOUT = 2000;
 
-    private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+    private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
+
+
+    private DmaapProducerReactiveHttpClient producerClientUnderTestSpy;
 
     private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
-    private ConsumerDmaapModel consumerDmaapModel;
 
-    private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
+    private IHttpAsyncClientBuilder clientBuilderMock;
+
     private CloseableHttpAsyncClient clientMock;
-    private HttpResponse responseMock = mock(HttpResponse.class);
     @SuppressWarnings("unchecked")
     private Future<HttpResponse> futureMock = mock(Future.class);
-    private StatusLine statusLine = mock(StatusLine.class);
-    private InputStream fileStream;
 
     @BeforeEach
     void setUp() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
@@ -102,82 +87,114 @@
         when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
         when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("dradmin");
         when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("dradmin");
-        when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn(APPLICATION_OCTET_STREAM_CONTENT_TYPE);
-        when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn(PUBLISH_TOPIC);
 
-        // @formatter:off
-        consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
-                .productName("NrRadio")
-                .vendorName("Ericsson")
-                .lastEpochMicrosec("8745745764578")
-                .sourceName("oteNB5309")
-                .startEpochMicrosec("8745745764578")
-                .timeZoneOffset("UTC+05:00")
-                .name("A20161224.1030-1045.bin.gz")
-                .location("ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz")
-                .internalLocation("target/A20161224.1030-1045.bin.gz")
-                .compression("gzip")
-                .fileFormatType("org.3GPP.32.435#measCollec")
-                .fileFormatVersion("V10")
-                .build();
-        //formatter:on
+        producerClientUnderTestSpy = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
 
-        dmaapProducerReactiveHttpClient = spy(new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock));
-        dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
+        clientBuilderMock = mock(IHttpAsyncClientBuilder.class);
         clientMock = mock(CloseableHttpAsyncClient.class);
-        doReturn(clientMock).when(dmaapProducerReactiveHttpClient).createWebClient();
     }
 
     @Test
-    void getHttpResponse_Success() throws Exception {
-        mockWebClientDependantObject();
+    void getHttpResponseWithRederict_Success() throws Exception {
+        doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+        when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+        when(clientBuilderMock.setSSLHostnameVerifier(any(NoopHostnameVerifier.class))).thenReturn(clientBuilderMock);
+        when(clientBuilderMock.build()).thenReturn(clientMock);
+        when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+        HttpResponse responseMock = mock(HttpResponse.class);
+        when(futureMock.get()).thenReturn(responseMock);
 
-        HttpPut httpPut = new HttpPut();
-        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
+        HttpGet request = new HttpGet();
+        producerClientUnderTestSpy.getDmaapProducerResponseWithRedirect(request, CONTEXT_MAP);
 
-        URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
-                .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
-        httpPut.setURI(expectedUri);
+        verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+        verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+        verify(clientBuilderMock).setRedirectStrategy(PublishRedirectStrategy.INSTANCE);
+        verify(clientBuilderMock).build();
+        verifyNoMoreInteractions(clientBuilderMock);
 
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
-        metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
-        metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
-        httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
+        verify(clientMock).start();
+        verify(clientMock).close();
 
-        String plainCreds = "dradmin" + ":" + "dradmin";
+        verify(futureMock).get();
+        verifyNoMoreInteractions(futureMock);
+    }
+
+    @Test
+    void getHttpResponseWithCustomTimeout_Success() throws Exception {
+        doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+        when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+        when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+        when(clientBuilderMock.build()).thenReturn(clientMock);
+        when(clientMock.execute(any(HttpUriRequest.class), any())).thenReturn(futureMock);
+        HttpResponse responseMock = mock(HttpResponse.class);
+        when(futureMock.get()).thenReturn(responseMock);
+
+        HttpGet request = new HttpGet();
+        producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT, CONTEXT_MAP);
+
+        ArgumentCaptor<RequestConfig> requestConfigCaptor = ArgumentCaptor.forClass(RequestConfig.class);
+        verify(clientBuilderMock).setSSLContext(any(SSLContext.class));
+        verify(clientBuilderMock).setSSLHostnameVerifier(any(NoopHostnameVerifier.class));
+        verify(clientBuilderMock).setDefaultRequestConfig(requestConfigCaptor.capture());
+        RequestConfig requestConfig = requestConfigCaptor.getValue();
+        assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getSocketTimeout());
+        assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectTimeout());
+        assertEquals(TWO_SECOND_TIMEOUT, requestConfig.getConnectionRequestTimeout());
+        verify(clientBuilderMock).build();
+        verifyNoMoreInteractions(clientBuilderMock);
+
+        verify(clientMock).start();
+        verify(clientMock).close();
+
+        verify(futureMock).get();
+        verifyNoMoreInteractions(futureMock);
+    }
+
+    @Test
+    public void getResponseWithException_throwsException() throws Exception {
+        doReturn(clientBuilderMock).when(producerClientUnderTestSpy).getHttpClientBuilder();
+        when(clientBuilderMock.setDefaultRequestConfig(any(RequestConfig.class))).thenReturn(clientBuilderMock);
+        when(clientBuilderMock.setSSLContext(any(SSLContext.class))).thenReturn(clientBuilderMock);
+        when(clientBuilderMock.build()).thenReturn(clientMock);
+        HttpPut request = new HttpPut();
+        when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+
+        try {
+            when(futureMock.get()).thenThrow(new InterruptedException("Interrupted"));
+
+            producerClientUnderTestSpy.getDmaapProducerResponseWithCustomTimeout(request, TWO_SECOND_TIMEOUT,
+                    CONTEXT_MAP);
+
+            fail("Should have got an exception.");
+        } catch (DatafileTaskException e) {
+            assertTrue(e.getCause() instanceof InterruptedException);
+            assertEquals("Interrupted", e.getCause().getMessage());
+        } catch (Exception e) {
+            fail("Wrong exception");
+        }
+
+        verify(clientMock).start();
+        verify(clientMock).close();
+    }
+
+    @Test
+    public void addCredentialsToHead_success() {
+        HttpPut request = new HttpPut();
+
+        producerClientUnderTestSpy.addUserCredentialsToHead(request);
+
+        String plainCreds = USER_NAME + ":" + USER_NAME;
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
         byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
-        String base64Creds = new String(base64CredsBytes);
-        httpPut.addHeader("Authorization", "Basic " + base64Creds);
-
-        fileStream.reset();
-        Map<String, String> contextMap = new HashMap<>();
-        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap))
-        .expectNext(HttpStatus.OK).verifyComplete();
-
-        verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
-        InputStream fileInputStream = fileSystemResourceMock.getInputStream();
-        httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+        String base64Creds = "Basic " + new String(base64CredsBytes);
+        Header[] authorizationHeaders = request.getHeaders("Authorization");
+        assertEquals(base64Creds, authorizationHeaders[0].getValue());
     }
 
     @Test
-    void getHttpResponse_Fail() throws Exception {
-        Map<String, String> contextMap = new HashMap<>();
-        doReturn(futureMock).when(clientMock).execute(any(), any());
-        doThrow(new InterruptedException()).when(futureMock).get();
-        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel, contextMap)) //
-           .expectError() //
-           .verify(); //
-    }
-
-    private void mockWebClientDependantObject()
-            throws IOException, InterruptedException, ExecutionException {
-        fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
-        when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
-        when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
-        when(futureMock.get()).thenReturn(responseMock);
-        when(responseMock.getStatusLine()).thenReturn(statusLine);
-        when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
-
+    public void getBaseUri_success() {
+        URI uri = producerClientUnderTestSpy.getBaseUri().build();
+        assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
     }
 }
