datafile sending message to kafka instead of to datarouter

Change-Id: I44e033dda27439e3ccb34500e45ff163ac15b35f
diff --git a/datafile/datafile-app-server/config/application.yaml b/datafile/datafile-app-server/config/application.yaml
index bd589e9..a77b15b 100644
--- a/datafile/datafile-app-server/config/application.yaml
+++ b/datafile/datafile-app-server/config/application.yaml
@@ -22,9 +22,15 @@
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.onap.dcaegen2.collectors.datafile: WARN
     org.onap.dcaegen2: WARN
-  file: /var/log/ONAP/application.log
+  file:
+    name: /var/log/ONAP/application.log
 app:
   filepath: config/datafile_endpoints_test.json
+   # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
+  # several redundant boostrap servers can be specified, separated by a comma ','.
+  kafka:
+    bootstrap-servers: localhost:9092
+    collected-file-topic: collected-file
 
 springdoc:
   show-actuator: true
diff --git a/datafile/datafile-app-server/pom.xml b/datafile/datafile-app-server/pom.xml
index 14620ed..ff865be 100644
--- a/datafile/datafile-app-server/pom.xml
+++ b/datafile/datafile-app-server/pom.xml
@@ -5,7 +5,7 @@
   ~ Copyright (C) 2018-2021 Nordix Foundation. All rights reserved.
   ~ Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
   ~ Copyright (c) 2021 Samsung Electronics. All rights reserved.
-  ~ ================================================================================ 
+  ~ ================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
   ~ You may obtain a copy of the License at
@@ -23,26 +23,31 @@
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
-
     <parent>
         <groupId>org.onap.dcaegen2.collectors</groupId>
         <artifactId>datafile</artifactId>
         <version>1.8.0-SNAPSHOT</version>
     </parent>
-
     <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
     <artifactId>datafile-app-server</artifactId>
     <packaging>jar</packaging>
-
     <properties>
         <docker.image.name>onap/${project.groupId}.${project.artifactId}</docker.image.name>
         <maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
         <docker-client.version>8.7.1</docker-client.version>
         <tomcat-embed-core.version>9.0.56</tomcat-embed-core.version>
     </properties>
-
     <dependencies>
         <dependency>
+            <groupId>io.projectreactor.kafka</groupId>
+            <artifactId>reactor-kafka</artifactId>
+            <version>1.3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
             <artifactId>cbs-client</artifactId>
         </dependency>
@@ -121,7 +126,6 @@
             <groupId>org.apache.httpcomponents.core5</groupId>
             <artifactId>httpcore5</artifactId>
         </dependency>
-
         <!-- Actuator dependencies -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -131,7 +135,6 @@
             <groupId>javax.xml.bind</groupId>
             <artifactId>jaxb-api</artifactId>
         </dependency>
-
         <!--TESTS DEPENDENCIES -->
         <dependency>
             <groupId>io.projectreactor</groupId>
@@ -179,7 +182,6 @@
             <artifactId>powermock-api-mockito2</artifactId>
             <scope>test</scope>
         </dependency>
-
         <!--REQUIRED TO GENERATE DOCUMENTATION -->
         <dependency>
             <groupId>io.springfox</groupId>
@@ -214,14 +216,12 @@
             <artifactId>jackson-databind</artifactId>
         </dependency>
     </dependencies>
-
     <build>
         <resources>
             <resource>
                 <directory>src/main/resources</directory>
             </resource>
         </resources>
-
         <plugins>
             <plugin>
                 <groupId>org.springframework.boot</groupId>
@@ -287,4 +287,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
index 520be3f..851db32 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/MainApp.java
@@ -21,7 +21,6 @@
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java
index b9aa644..613fa39 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/Scheme.java
@@ -31,7 +31,8 @@
     FTPES, SFTP, HTTP, HTTPS;
 
     public static final String DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG = "DFC does not support protocol ";
-    public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE = ". Supported protocols are FTPeS, sFTP, HTTP and HTTPS";
+    public static final String SUPPORTED_PROTOCOLS_ERROR_MESSAGE =
+        ". Supported protocols are FTPeS, sFTP, HTTP and HTTPS";
 
     /**
      * Get a <code>Scheme</code> from a string.
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/SecurityUtil.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/SecurityUtil.java
index 79db32d..9d6b7f9 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/SecurityUtil.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/commons/SecurityUtil.java
@@ -15,20 +15,22 @@
  */
 package org.onap.dcaegen2.collectors.datafile.commons;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Utility class containing functions used for certificates configuration
  *
  * @author <a href="mailto:krzysztof.gajewski@nokia.com">Krzysztof Gajewski</a>
  */
 public final class SecurityUtil {
-    private SecurityUtil() {}
+    private SecurityUtil() {
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(SecurityUtil.class);
 
     public static String getKeystorePasswordFromFile(String passwordPath) {
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 0691d72..8560e59 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -34,7 +34,6 @@
 import java.util.Properties;
 import java.util.ServiceLoader;
 
-import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
 
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
@@ -49,9 +48,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
 import org.springframework.stereotype.Component;
 
 import reactor.core.Disposable;
@@ -61,14 +58,13 @@
 /**
  * Holds all configuration for the DFC.
  *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on
+ *         3/23/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 
 @Component
-@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
 @EnableConfigurationProperties
-@ConfigurationProperties("app")
 public class AppConfig {
 
     private static final Logger logger = LoggerFactory.getLogger(AppConfig.class);
@@ -81,11 +77,17 @@
     private SftpConfig sftpConfiguration;
     private Disposable refreshConfigTask = null;
 
-    @NotEmpty
-    private String filepath;
+    @Value("${app.filepath}")
+    String filepath;
 
-    public synchronized void setFilepath(String filepath) {
-        this.filepath = filepath;
+    @Value("${app.kafka.bootstrap-servers:}")
+    private String kafkaBootStrapServers;
+
+    @Value("${app.kafka.collected-file-topic:}")
+    public String collectedFileTopic;
+
+    public String getKafkaBootStrapServers() {
+        return kafkaBootStrapServers;
     }
 
     /**
@@ -103,8 +105,7 @@
     }
 
     Flux<AppConfig> createRefreshTask() {
-        return createCbsClientConfiguration()
-            .flatMap(this::createCbsClient)
+        return createCbsClientConfiguration().flatMap(this::createCbsClient)
             .flatMapMany(this::periodicConfigurationUpdates) //
             .map(this::parseCloudConfig) //
             .onErrorResume(this::onErrorResume);
@@ -134,19 +135,24 @@
     /**
      * Checks if there is a configuration for the given feed.
      *
-     * @param changeIdentifier the change identifier the feed is configured to belong to.
-     * @return true if a feed is configured for the given change identifier, false if not.
+     * @param changeIdentifier the change identifier the feed is configured to
+     *        belong to.
+     * @return true if a feed is configured for the given change identifier, false
+     *         if not.
      */
-    public synchronized boolean isFeedConfigured(String changeIdentifier) {
+    private synchronized boolean isFeedConfigured(String changeIdentifier) {
         return publishingConfigurations.containsKey(changeIdentifier);
     }
 
     /**
      * Gets the feed configuration for the given change identifier.
      *
-     * @param changeIdentifier the change identifier the feed is configured to belong to.
-     * @return the <code>PublisherConfiguration</code> for the feed belonging to the given change identifier.
-     * @throws DatafileTaskException if no configuration has been loaded or the configuration is missing for the given
+     * @param changeIdentifier the change identifier the feed is configured to
+     *        belong to.
+     * @return the <code>PublisherConfiguration</code> for the feed belonging to the
+     *         given change identifier.
+     * @throws DatafileTaskException if no configuration has been loaded or the
+     *         configuration is missing for the given
      *         change identifier.
      */
     public synchronized PublisherConfiguration getPublisherConfiguration(String changeIdentifier)
@@ -191,11 +197,9 @@
 
     private AppConfig parseCloudConfig(JsonObject configurationObject) {
         try {
-            CloudConfigParser parser =
-                new CloudConfigParser(configurationObject, systemEnvironment);
-            setConfiguration(parser.getConsumerConfiguration(),
-                parser.getDmaapPublisherConfigurations(), parser.getCertificateConfig(),
-                parser.getSftpConfig());
+            CloudConfigParser parser = new CloudConfigParser(configurationObject, systemEnvironment);
+            setConfiguration(parser.getConsumerConfiguration(), parser.getDmaapPublisherConfigurations(),
+                parser.getCertificateConfig(), parser.getSftpConfig());
             logConfig();
         } catch (DatafileTaskException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
@@ -227,8 +231,8 @@
     }
 
     private synchronized void setConfiguration(@NotNull ConsumerConfiguration consumerConfiguration,
-        @NotNull Map<String, PublisherConfiguration> publisherConfiguration, @NotNull CertificateConfig certificateConfig,
-        @NotNull SftpConfig sftpConfig) throws DatafileTaskException {
+        @NotNull Map<String, PublisherConfiguration> publisherConfiguration,
+        @NotNull CertificateConfig certificateConfig, @NotNull SftpConfig sftpConfig) throws DatafileTaskException {
         this.dmaapConsumerConfiguration = consumerConfiguration;
         this.publishingConfigurations = publisherConfiguration;
         this.certificateConfiguration = certificateConfig;
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
index db811fa..efd2c89 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParser.java
@@ -22,6 +22,8 @@
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
+import io.vavr.collection.Stream;
+
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
@@ -32,7 +34,6 @@
 
 import javax.validation.constraints.NotNull;
 
-import io.vavr.collection.Stream;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
@@ -70,8 +71,10 @@
     private static final String CBS_PROPERTY_SFTP_SECURITY_STRICT_HOST_KEY_CHECKING =
         "sftp.security.strictHostKeyChecking";
 
-    private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP = "dmaap.dmaapConsumerConfiguration.consumerGroup";
-    private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID = "dmaap.dmaapConsumerConfiguration.consumerId";
+    private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP =
+        "dmaap.dmaapConsumerConfiguration.consumerGroup";
+    private static final String DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID =
+        "dmaap.dmaapConsumerConfiguration.consumerId";
     private static final String DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS = "dmaap.dmaapConsumerConfiguration.timeoutMs";
     private static final int EXPECTED_NUMBER_OF_SOURCE_TOPICS = 1;
     private static final int FIRST_SOURCE_INDEX = 0;
@@ -107,10 +110,11 @@
                 .publishUrl(getAsString(feedConfig, "publish_url")) //
                 .password(getAsString(feedConfig, "password")) //
                 .userName(getAsString(feedConfig, "username")) //
-                .trustStorePath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH,"")) //
-                .trustStorePasswordPath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH, "")) //
-                .keyStorePath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH,"")) //
-                .keyStorePasswordPath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH,"")) //
+                .trustStorePath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_TRUST_STORE_PATH, "")) //
+                .trustStorePasswordPath(
+                    getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_TRUST_STORE_PASS_PATH, "")) //
+                .keyStorePath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_KEY_STORE_PATH, "")) //
+                .keyStorePasswordPath(getAsOptionalStringOrDefault(jsonObject, DMAAP_SECURITY_KEY_STORE_PASS_PATH, "")) //
                 .enableDmaapCertAuth(get(jsonObject, DMAAP_SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean()) //
                 .changeIdentifier(changeIdentifier) //
                 .logUrl(getAsString(feedConfig, "log_url")) //
@@ -131,8 +135,10 @@
         try {
             MessageRouterSubscriberConfig messageRouterSubscriberConfig = getMessageRouterSubscriberConfig();
             MessageRouterSubscribeRequest messageRouterSubscribeRequest = getMessageRouterSubscribeRequest();
-            MessageRouterSubscriber messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
-            return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber, messageRouterSubscribeRequest);
+            MessageRouterSubscriber messageRouterSubscriber =
+                DmaapClientFactory.createMessageRouterSubscriber(messageRouterSubscriberConfig);
+            return new ConsumerConfiguration(messageRouterSubscriberConfig, messageRouterSubscriber,
+                messageRouterSubscribeRequest);
         } catch (Exception e) {
             throw new DatafileTaskException("Could not parse message router consumer configuration", e);
         }
@@ -140,8 +146,7 @@
 
     private MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() throws DatafileTaskException {
         return ImmutableMessageRouterSubscriberConfig.builder()
-            .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null)
-            .build();
+            .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys() : null).build();
     }
 
     private SecurityKeys createSecurityKeys() throws DatafileTaskException {
@@ -169,8 +174,7 @@
             .consumerGroup(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_GROUP))
             .sourceDefinition(parsedSource)
             .consumerId(getAsString(jsonObject, DMAAP_CONSUMER_CONFIGURATION_CONSUMER_ID))
-            .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong()))
-            .build();
+            .timeout(Duration.ofMillis(get(jsonObject, DMAAP_CONSUMER_CONFIGURATION_TIMEOUT_MS).getAsLong())).build();
     }
 
     /**
@@ -193,8 +197,8 @@
      * @throws DatafileTaskException if a member of the configuration is missing.
      */
     public @NotNull CertificateConfig getCertificateConfig() throws DatafileTaskException {
-        boolean enableCertAuth = getAsBooleanOrDefault(jsonObject, "dmaap.certificateConfig.enableCertAuth",
-            Boolean.TRUE);
+        boolean enableCertAuth =
+            getAsBooleanOrDefault(jsonObject, "dmaap.certificateConfig.enableCertAuth", Boolean.TRUE);
 
         String keyCert = "";
         String keyPasswordPath = "";
@@ -209,23 +213,18 @@
                 keyPasswordPath = getAsString(jsonObject, "dmaap.certificateConfig.keyPasswordPath");
                 trustedCa = getAsString(jsonObject, "dmaap.certificateConfig.trustedCa");
                 trustedCaPasswordPath = getAsString(jsonObject, "dmaap.certificateConfig.trustedCaPasswordPath");
-                httpsHostnameVerify = getAsBooleanOrDefault(jsonObject, "dmaap.certificateConfig.httpsHostnameVerify",
-                    Boolean.TRUE);
+                httpsHostnameVerify =
+                    getAsBooleanOrDefault(jsonObject, "dmaap.certificateConfig.httpsHostnameVerify", Boolean.TRUE);
             } catch (DatafileTaskException e) {
                 throw new DatafileTaskException(
-                    "Wrong configuration. External certificate enabled but configs are missing: "
-                        + e.getMessage());
+                    "Wrong configuration. External certificate enabled but configs are missing: " + e.getMessage());
             }
         }
 
         return new ImmutableCertificateConfig.Builder() //
-            .keyCert(keyCert)
-            .keyPasswordPath(keyPasswordPath)
-            .trustedCa(trustedCa)
+            .keyCert(keyCert).keyPasswordPath(keyPasswordPath).trustedCa(trustedCa)
             .trustedCaPasswordPath(trustedCaPasswordPath) //
-            .httpsHostnameVerify(httpsHostnameVerify)
-            .enableCertAuth(enableCertAuth)
-            .build();
+            .httpsHostnameVerify(httpsHostnameVerify).enableCertAuth(enableCertAuth).build();
     }
 
     private String determineKnownHostsFilePath() {
@@ -257,7 +256,6 @@
         }
     }
 
-
     private static @NotNull Boolean getAsBoolean(JsonObject obj, String memberName) throws DatafileTaskException {
         return get(obj, memberName).getAsBoolean();
     }
@@ -274,7 +272,8 @@
         return get(obj, memberName).getAsJsonObject();
     }
 
-    private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath) throws DatafileTaskException {
+    private static @NotNull Path getAsPath(JsonObject obj, String dmaapSecurityKeyStorePath)
+        throws DatafileTaskException {
         return Paths.get(getAsString(obj, dmaapSecurityKeyStorePath));
     }
 
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
index fa7d784..8fc9366 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/PublisherConfiguration.java
@@ -17,11 +17,9 @@
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 
-
 @Value.Immutable
 @Value.Style(redactedMask = "####")
 @Gson.TypeAdapters
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
index 35a3159..b7dc521 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SwaggerConfig.java
@@ -24,10 +24,13 @@
 import io.swagger.v3.oas.annotations.info.License;
 
 @OpenAPIDefinition(
-    info = @Info(title = SwaggerConfig.API_TITLE, version = SwaggerConfig.VERSION,
-        description = SwaggerConfig.DESCRIPTION, license = @License(name = "Copyright (C) 2020 Nordix Foundation. Licensed under the Apache License.",
-        url = "http://www.apache.org/licenses/LICENSE-2.0"))
-)
+    info = @Info(
+        title = SwaggerConfig.API_TITLE,
+        version = SwaggerConfig.VERSION,
+        description = SwaggerConfig.DESCRIPTION,
+        license = @License(
+            name = "Copyright (C) 2020 Nordix Foundation. Licensed under the Apache License.",
+            url = "http://www.apache.org/licenses/LICENSE-2.0")))
 public class SwaggerConfig {
 
     public static final String VERSION = "1.0";
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index aeb4c72..be508e1 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -21,6 +21,7 @@
 
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.SchedulerConfig;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
 import org.slf4j.Logger;
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
index aaebfbf..293a9d1 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusController.java
@@ -24,6 +24,7 @@
 import io.swagger.annotations.ApiResponses;
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.tags.Tag;
+
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
 import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index e972aa3..4e29fcb 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -107,6 +107,7 @@
             }
         }
     }
+
     JSch createJsch() {
         return new JSch();
     }
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettings.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettings.java
index a4c5cb3..a2d5bda 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettings.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientSettings.java
@@ -17,6 +17,7 @@
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
 import java.io.File;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.SftpConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
index 4564a44..7c25cca 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClient.java
@@ -15,21 +15,6 @@
  */
 package org.onap.dcaegen2.collectors.datafile.http;
 
-import org.jetbrains.annotations.NotNull;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.netty.http.client.HttpClient;
-import reactor.netty.http.client.HttpClientResponse;
-import reactor.netty.resources.ConnectionProvider;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
@@ -38,6 +23,22 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.netty.resources.ConnectionProvider;
+
 /**
  * Gets file from PNF with HTTP protocol.
  *
@@ -45,7 +46,7 @@
  */
 public class DfcHttpClient implements FileCollectClient {
 
-    //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
+    // Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
     private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
     private static final Logger logger = LoggerFactory.getLogger(DfcHttpClient.class);
     private static final ConnectionProvider pool = ConnectionProvider.create("default", MAX_NUMBER_OF_CONNECTIONS);
@@ -59,12 +60,13 @@
         this.fileServerData = fileServerData;
     }
 
-    @Override public void open() throws DatafileTaskException {
+    @Override
+    public void open() throws DatafileTaskException {
         logger.trace("Setting httpClient for file download.");
 
         String authorizationContent = getAuthorizationContent();
-        this.client = HttpClient.create(pool).keepAlive(true).headers(
-            h -> h.add("Authorization", authorizationContent));
+        this.client =
+            HttpClient.create(pool).keepAlive(true).headers(h -> h.add("Authorization", authorizationContent));
 
         logger.trace("httpClient, auth header was set.");
     }
@@ -80,7 +82,8 @@
         return HttpUtils.basicAuthContent(this.fileServerData.userId(), this.fileServerData.password());
     }
 
-    @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+    @Override
+    public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
         logger.trace("Prepare to collectFile {}", localFile);
         CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<Exception> errorMessage = new AtomicReference<>();
@@ -111,7 +114,9 @@
         return (errorMessage.get() != null);
     }
 
-    @NotNull protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch, AtomicReference<Exception> errorMessages) {
+    @NotNull
+    protected Consumer<Throwable> processFailedConnectionWithServer(CountDownLatch latch,
+        AtomicReference<Exception> errorMessages) {
         return (Throwable response) -> {
             Exception e = new Exception("Error in connection has occurred during file download", response);
             errorMessages.set(new DatafileTaskException(response.getMessage(), e));
@@ -122,8 +127,9 @@
         };
     }
 
-    @NotNull protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
-            AtomicReference<Exception> errorMessages) {
+    @NotNull
+    protected Consumer<InputStream> processDataFromServer(Path localFile, CountDownLatch latch,
+        AtomicReference<Exception> errorMessages) {
         return (InputStream response) -> {
             logger.trace("Starting to process response.");
             try {
@@ -140,19 +146,18 @@
     }
 
     protected Flux<InputStream> getServerResponse(String remoteFile) {
-        return client.get()
-            .uri(HttpUtils.prepareHttpUri(fileServerData, remoteFile))
+        return client.get().uri(HttpUtils.prepareHttpUri(fileServerData, remoteFile))
             .response((responseReceiver, byteBufFlux) -> {
                 logger.trace("HTTP response status - {}", responseReceiver.status());
-                if(isResponseOk(responseReceiver)){
+                if (isResponseOk(responseReceiver)) {
                     return byteBufFlux.aggregate().asInputStream();
                 }
                 if (isErrorInConnection(responseReceiver)) {
                     return Mono.error(new NonRetryableDatafileTaskException(
                         HttpUtils.nonRetryableResponse(getResponseCode(responseReceiver))));
                 }
-                return Mono.error(new DatafileTaskException(
-                    HttpUtils.retryableResponse(getResponseCode(responseReceiver))));
+                return Mono
+                    .error(new DatafileTaskException(HttpUtils.retryableResponse(getResponseCode(responseReceiver))));
             });
     }
 
@@ -168,7 +173,8 @@
         return getResponseCode(httpClientResponse) >= 400;
     }
 
-    @Override public void close() {
+    @Override
+    public void close() {
         logger.trace("Starting http client disposal.");
         disposableClient.dispose();
         logger.trace("Http client disposed.");
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClient.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClient.java
index 9bb0118..872f1e6 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClient.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClient.java
@@ -15,6 +15,16 @@
  */
 package org.onap.dcaegen2.collectors.datafile.http;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+
+import javax.net.ssl.SSLHandshakeException;
+import javax.net.ssl.SSLPeerUnverifiedException;
+
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.config.RequestConfig;
@@ -35,15 +45,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.UnknownHostException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-
 /**
  * Gets file from PNF with HTTPS protocol.
  *
@@ -59,31 +60,26 @@
     private final FileServerData fileServerData;
     private final PoolingHttpClientConnectionManager connectionManager;
 
-    public DfcHttpsClient(FileServerData fileServerData, PoolingHttpClientConnectionManager  connectionManager) {
+    public DfcHttpsClient(FileServerData fileServerData, PoolingHttpClientConnectionManager connectionManager) {
         this.fileServerData = fileServerData;
         this.connectionManager = connectionManager;
     }
 
-    @Override public void open() {
+    @Override
+    public void open() {
         logger.trace("Setting httpsClient for file download.");
-        SocketConfig socketConfig = SocketConfig.custom()
-            .setSoKeepAlive(true)
-            .build();
+        SocketConfig socketConfig = SocketConfig.custom().setSoKeepAlive(true).build();
 
-        RequestConfig requestConfig = RequestConfig.custom()
-            .setConnectTimeout(FIFTEEN_SECONDS)
-            .build();
+        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(FIFTEEN_SECONDS).build();
 
-        httpsClient = HttpClients.custom()
-            .setConnectionManager(connectionManager)
-            .setDefaultSocketConfig(socketConfig)
-            .setDefaultRequestConfig(requestConfig)
-            .build();
+        httpsClient = HttpClients.custom().setConnectionManager(connectionManager).setDefaultSocketConfig(socketConfig)
+            .setDefaultRequestConfig(requestConfig).build();
 
         logger.trace("httpsClient prepared for connection.");
     }
 
-    @Override public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+    @Override
+    public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
         logger.trace("Prepare to collectFile {}", localFile);
         HttpGet httpGet = new HttpGet(HttpUtils.prepareHttpsUri(fileServerData, remoteFile));
 
@@ -126,8 +122,7 @@
         return this.fileServerData.userId().isEmpty() && this.fileServerData.password().isEmpty();
     }
 
-    protected HttpResponse makeCall(HttpGet httpGet)
-            throws IOException, DatafileTaskException {
+    protected HttpResponse makeCall(HttpGet httpGet) throws IOException, DatafileTaskException {
         try {
             HttpResponse httpResponse = executeHttpClient(httpGet);
             if (isResponseOk(httpResponse)) {
@@ -140,14 +135,13 @@
             }
             throw new DatafileTaskException(HttpUtils.nonRetryableResponse(getResponseCode(httpResponse)));
         } catch (ConnectTimeoutException | UnknownHostException | HttpHostConnectException | SSLHandshakeException
-                | SSLPeerUnverifiedException e) {
-            throw new NonRetryableDatafileTaskException(
-                    "Unable to get file from xNF. No retry attempts will be done.", e);
+            | SSLPeerUnverifiedException e) {
+            throw new NonRetryableDatafileTaskException("Unable to get file from xNF. No retry attempts will be done.",
+                e);
         }
     }
 
-    protected CloseableHttpResponse executeHttpClient(HttpGet httpGet)
-            throws IOException {
+    protected CloseableHttpResponse executeHttpClient(HttpGet httpGet) throws IOException {
         return httpsClient.execute(httpGet);
     }
 
@@ -177,7 +171,8 @@
         return Files.copy(stream, localFile, StandardCopyOption.REPLACE_EXISTING);
     }
 
-    @Override public void close() {
+    @Override
+    public void close() {
         logger.trace("Https client has ended downloading process.");
     }
 }
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtil.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtil.java
index 2563856..29a8301 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtil.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtil.java
@@ -15,6 +15,19 @@
  */
 package org.onap.dcaegen2.collectors.datafile.http;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+
 import org.apache.http.config.Registry;
 import org.apache.http.config.RegistryBuilder;
 import org.apache.http.conn.socket.ConnectionSocketFactory;
@@ -30,18 +43,6 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.core.io.FileSystemResource;
 
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Paths;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-
 /**
  * Utility class supplying connection manager for HTTPS protocol.
  *
@@ -53,7 +54,7 @@
     }
 
     private static final Logger logger = LoggerFactory.getLogger(HttpsClientConnectionManagerUtil.class);
-    //Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
+    // Be aware to be less than ScheduledTasks.NUMBER_OF_WORKER_THREADS
     private static final int MAX_NUMBER_OF_CONNECTIONS = 200;
     private static PoolingHttpClientConnectionManager connectionManager;
 
@@ -65,7 +66,7 @@
     }
 
     public static void setupOrUpdate(String keyCertPath, String keyCertPasswordPath, String trustedCaPath,
-            String trustedCaPasswordPath, Boolean useHostnameVerifier) throws DatafileTaskException {
+        String trustedCaPasswordPath, Boolean useHostnameVerifier) throws DatafileTaskException {
         synchronized (HttpsClientConnectionManagerUtil.class) {
             if (connectionManager != null) {
                 connectionManager.close();
@@ -77,7 +78,7 @@
     }
 
     private static void setup(String keyCertPath, String keyCertPasswordPath, String trustedCaPath,
-          String trustedCaPasswordPath, Boolean useHostnameVerifier) throws DatafileTaskException {
+        String trustedCaPasswordPath, Boolean useHostnameVerifier) throws DatafileTaskException {
         try {
             SSLContextBuilder sslBuilder = SSLContexts.custom();
             sslBuilder = supplyKeyInfo(keyCertPath, keyCertPasswordPath, sslBuilder);
@@ -85,16 +86,15 @@
 
             SSLContext sslContext = sslBuilder.build();
 
-            HostnameVerifier hostnameVerifier = (Boolean.TRUE.equals(useHostnameVerifier)) ? new DefaultHostnameVerifier() :
-                    NoopHostnameVerifier.INSTANCE;
+            HostnameVerifier hostnameVerifier =
+                (Boolean.TRUE.equals(useHostnameVerifier)) ? new DefaultHostnameVerifier()
+                    : NoopHostnameVerifier.INSTANCE;
 
             SSLConnectionSocketFactory sslConnectionSocketFactory =
-                new SSLConnectionSocketFactory(sslContext, new String[] {"TLSv1.2"}, null,
-                        hostnameVerifier);
+                new SSLConnectionSocketFactory(sslContext, new String[] {"TLSv1.2"}, null, hostnameVerifier);
 
             Registry<ConnectionSocketFactory> socketFactoryRegistry =
-                RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslConnectionSocketFactory)
-                    .build();
+                RegistryBuilder.<ConnectionSocketFactory>create().register("https", sslConnectionSocketFactory).build();
 
             connectionManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
             connectionManager.setMaxTotal(MAX_NUMBER_OF_CONNECTIONS);
@@ -105,16 +105,15 @@
     }
 
     private static SSLContextBuilder supplyKeyInfo(String keyCertPath, String keyCertPasswordPath,
-            SSLContextBuilder sslBuilder)
-            throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException,
-            UnrecoverableKeyException {
+        SSLContextBuilder sslBuilder) throws IOException, KeyStoreException, NoSuchAlgorithmException,
+        CertificateException, UnrecoverableKeyException {
         String keyPass = SecurityUtil.getKeystorePasswordFromFile(keyCertPasswordPath);
         KeyStore keyFile = createKeyStore(keyCertPath, keyPass);
         return sslBuilder.loadKeyMaterial(keyFile, keyPass.toCharArray());
     }
 
     private static KeyStore createKeyStore(String trustedCaPath, String trustedCaPassword)
-            throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
+        throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
         logger.trace("Creating trust manager from file: {}", trustedCaPath);
         try (InputStream fis = createInputStream(trustedCaPath)) {
             KeyStore keyStore = KeyStore.getInstance("PKCS12");
@@ -129,8 +128,8 @@
     }
 
     private static SSLContextBuilder supplyTrustInfo(String trustedCaPath, String trustedCaPasswordPath,
-            SSLContextBuilder sslBuilder)
-            throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException {
+        SSLContextBuilder sslBuilder)
+        throws NoSuchAlgorithmException, KeyStoreException, CertificateException, IOException {
         String trustPass = SecurityUtil.getTruststorePasswordFromFile(trustedCaPasswordPath);
         File trustStoreFile = new File(trustedCaPath);
         return sslBuilder.loadTrustMaterial(trustStoreFile, trustPass.toCharArray());
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
index 34f2ac0..f4157f6 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformation.java
@@ -16,62 +16,39 @@
 
 package org.onap.dcaegen2.collectors.datafile.model;
 
-import com.google.gson.annotations.SerializedName;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
 
-import java.nio.file.Path;
-import java.util.Map;
+@Builder
+@EqualsAndHashCode
+public class FilePublishInformation {
 
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
+    String productName;
 
-/**
- * Information needed to publish a file to DataRouter.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
+    String vendorName;
 
-@Value.Immutable
-@Gson.TypeAdapters
-@Value.Style(redactedMask = "####")
-public interface FilePublishInformation {
+    String lastEpochMicrosec;
 
-    @SerializedName("productName")
-    String getProductName();
+    @Getter
+    String sourceName;
 
-    @SerializedName("vendorName")
-    String getVendorName();
+    String startEpochMicrosec;
 
-    @SerializedName("lastEpochMicrosec")
-    String getLastEpochMicrosec();
+    String timeZoneOffset;
 
-    @SerializedName("sourceName")
-    String getSourceName();
+    String location;
 
-    @SerializedName("startEpochMicrosec")
-    String getStartEpochMicrosec();
+    String compression;
 
-    @SerializedName("timeZoneOffset")
-    String getTimeZoneOffset();
+    String fileFormatType;
 
-    @SerializedName("location")
-    @Value.Redacted
-    String getLocation();
+    String fileFormatVersion;
 
-    @SerializedName("compression")
-    String getCompression();
+    String name;
 
-    @SerializedName("fileFormatType")
-    String getFileFormatType();
+    String changeIdentifier;
 
-    @SerializedName("fileFormatVersion")
-    String getFileFormatVersion();
-
-    Path getInternalLocation();
-
-    String getName();
-
-    Map<String, String> getContext();
-
-    String getChangeIdentifier();
+    @Getter
+    String internalLocation;
 }
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
index 69bfcf4..b22b4dc 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -19,16 +19,15 @@
 
 package org.onap.dcaegen2.collectors.datafile.service;
 
-import org.apache.hc.core5.http.NameValuePair;
-import org.apache.http.HttpStatus;
-import org.jetbrains.annotations.NotNull;
-import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
-
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import java.util.Optional;
 
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.http.HttpStatus;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +38,8 @@
     public static final int HTTPS_DEFAULT_PORT = 443;
     public static final String JWT_TOKEN_NAME = "access_token";
     public static final String AUTH_JWT_WARN = "Both JWT token and Basic auth data present. Omitting basic auth info.";
-    public static final String AUTH_JWT_ERROR = "More than one JWT token present in the queryParameters. Omitting JWT token.";
+    public static final String AUTH_JWT_ERROR =
+        "More than one JWT token present in the queryParameters. Omitting JWT token.";
 
     private HttpUtils() {
     }
@@ -78,7 +78,8 @@
      * @param remoteFile file which has to be downloaded
      * @return uri String representing the xNF HTTP location
      */
-    @NotNull public static String prepareHttpUri(FileServerData fileServerData, String remoteFile){
+    @NotNull
+    public static String prepareHttpUri(FileServerData fileServerData, String remoteFile) {
         return prepareUri("http", fileServerData, remoteFile, HTTP_DEFAULT_PORT);
     }
 
@@ -90,7 +91,8 @@
      * @param remoteFile file which has to be downloaded
      * @return uri String representing the xNF HTTPS location
      */
-    @NotNull public static String prepareHttpsUri(FileServerData fileServerData, String remoteFile){
+    @NotNull
+    public static String prepareHttpsUri(FileServerData fileServerData, String remoteFile) {
         return prepareUri("https", fileServerData, remoteFile, HTTPS_DEFAULT_PORT);
     }
 
@@ -104,7 +106,8 @@
      * @param defaultPort default port which exchange empty entry for given connection type
      * @return uri String representing the xNF location
      */
-    @NotNull public static String prepareUri(String scheme, FileServerData fileServerData, String remoteFile, int defaultPort) {
+    @NotNull
+    public static String prepareUri(String scheme, FileServerData fileServerData, String remoteFile, int defaultPort) {
         int port = fileServerData.port().orElse(defaultPort);
         String query = rewriteQueryWithoutToken(fileServerData.queryParameters().orElse(new ArrayList<>()));
         String fragment = fileServerData.uriRawFragment().orElse("");
@@ -200,7 +203,7 @@
      *
      * @param query list of NameValuePair of elements sent in the queryParameters
      * @return String representation of queryParameters elements which were provided in the input
-     *     Empty string is possible when queryParameters is empty or contains only access_token key.
+     *         Empty string is possible when queryParameters is empty or contains only access_token key.
      */
     public static String rewriteQueryWithoutToken(List<NameValuePair> query) {
         if (query.isEmpty()) {
@@ -218,8 +221,8 @@
             }
             sb.append("&");
         }
-        if ((sb.length() > 0) && (sb.charAt(sb.length() - 1 ) == '&')) {
-            sb.deleteCharAt(sb.length() - 1 );
+        if ((sb.length() > 0) && (sb.charAt(sb.length() - 1) == '&')) {
+            sb.deleteCharAt(sb.length() - 1);
         }
         return sb.toString();
     }
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
index 0780e18..6fe0194 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumer.java
@@ -22,6 +22,7 @@
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import com.google.gson.JsonElement;
+
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
@@ -59,8 +60,7 @@
         logger.trace("getMessageRouterResponse called");
         try {
             ConsumerConfiguration dmaapConsumerConfiguration = datafileAppConfig.getDmaapConsumerConfiguration();
-            MessageRouterSubscriber messageRouterSubscriber =
-                dmaapConsumerConfiguration.getMessageRouterSubscriber();
+            MessageRouterSubscriber messageRouterSubscriber = dmaapConsumerConfiguration.getMessageRouterSubscriber();
             Flux<JsonElement> responseElements =
                 messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
             return consume(responseElements);
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
deleted file mode 100644
index ef2341f..0000000
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2019 Nordix Foundation.
- *  Copyright (C) 2020-2021 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
-
-import java.io.File;
-import java.net.URI;
-import java.nio.file.Path;
-import java.time.Duration;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.FileEntity;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.core.io.FileSystemResource;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-/**
- * Publishes a file to the DataRouter.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public class DataRouterPublisher {
-    private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-    private static final String CONTENT_TYPE = "application/octet-stream";
-
-    private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
-    private final AppConfig datafileAppConfig;
-    private final Counters counters;
-
-    public DataRouterPublisher(AppConfig datafileAppConfig, Counters counters) {
-        this.datafileAppConfig = datafileAppConfig;
-        this.counters = counters;
-    }
-
-    /**
-     * Publish one file.
-     *
-     * @param publishInfo information about the file to publish
-     * @param numRetries the maximal number of retries if the publishing fails
-     * @param firstBackoff the time to delay the first retry
-     * @return the (same) filePublishInformation
-     */
-    public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
-        Duration firstBackoff) {
-        MDC.setContextMap(publishInfo.getContext());
-        return Mono.just(publishInfo) //
-            .cache() //
-            .flatMap(this::publishFile) //
-            .flatMap(httpStatus -> handleHttpResponse(httpStatus, publishInfo)) //
-            .retryWhen(Retry.backoff(numRetries,firstBackoff));
-    }
-
-    private Mono<HttpStatus> publishFile(FilePublishInformation publishInfo) {
-        MDC.setContextMap(publishInfo.getContext());
-        logger.trace("Entering publishFile with {}", publishInfo);
-        try {
-            DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier());
-            HttpPut put = new HttpPut();
-            prepareHead(publishInfo, put);
-            prepareBody(publishInfo, put);
-            dmaapProducerHttpClient.addUserCredentialsToHead(put);
-
-            HttpResponse response =
-                dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
-            logger.trace("{}", response);
-            return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
-        } catch (Exception e) {
-            counters.incNoOfFailedPublishAttempts();
-            logger.warn("Publishing file {} to DR unsuccessful.", publishInfo.getName(), e);
-            return Mono.error(e);
-        }
-    }
-
-    private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
-
-        put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
-        JsonElement metaData = JsonParser.parseString(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
-        put.addHeader(X_DMAAP_DR_META, metaData.toString());
-        URI uri = new DefaultUriBuilderFactory(
-            datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
-                .builder() //
-                .pathSegment(publishInfo.getName()) //
-                .build();
-        put.setURI(uri);
-
-        MappedDiagnosticContext.appendTraceInfo(put);
-    }
-
-    private void prepareBody(FilePublishInformation publishInfo, HttpPut put) {
-        File file = createInputFile(publishInfo.getInternalLocation());
-        FileEntity entity = new FileEntity(file, ContentType.DEFAULT_BINARY);
-        put.setEntity(entity);
-    }
-
-    private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
-        MDC.setContextMap(publishInfo.getContext());
-        if (HttpUtils.isSuccessfulResponseCodeWithDataRouter(response.value())) {
-            counters.incTotalPublishedFiles();
-            logger.trace("Publishing file {} to DR successful!", publishInfo.getName());
-            return Mono.just(publishInfo);
-        } else {
-            counters.incNoOfFailedPublishAttempts();
-            logger.warn("Publishing file {} to DR unsuccessful. Response code: {}", publishInfo.getName(), response);
-            return Mono.error(new Exception(
-                "Publishing file " + publishInfo.getName() + " to DR unsuccessful. Response code: " + response));
-        }
-    }
-
-    File createInputFile(Path filePath) {
-        FileSystemResource realResource = new FileSystemResource(filePath);
-        return realResource.getFile();
-    }
-
-    PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
-        return datafileAppConfig.getPublisherConfiguration(changeIdentifer);
-    }
-
-    DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
-        PublisherConfiguration publisherConfiguration = resolveConfiguration(changeIdentifier);
-        return new DmaapProducerHttpClient(publisherConfiguration);
-
-    }
-}
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
index 0df57a2..0f7a902 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -23,12 +23,12 @@
 import java.util.Map;
 import java.util.Optional;
 
+import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.commons.FileCollectClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClientSettings;
@@ -38,11 +38,11 @@
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
+
 import reactor.core.publisher.Mono;
 import reactor.util.retry.Retry;
 
@@ -84,8 +84,7 @@
         return Mono.just(fileData) //
             .cache() //
             .flatMap(fd -> tryCollectFile(fileData, contextMap)) //
-            .retryWhen(Retry.backoff(numRetries,firstBackoff))
-            .flatMap(FileCollector::checkCollectedFile);
+            .retryWhen(Retry.backoff(numRetries, firstBackoff)).flatMap(FileCollector::checkCollectedFile);
     }
 
     private static Mono<FilePublishInformation> checkCollectedFile(Optional<FilePublishInformation> info) {
@@ -152,7 +151,7 @@
         Map<String, String> context) {
         String location = fileData.location();
         MessageMetaData metaData = fileData.messageMetaData();
-        return ImmutableFilePublishInformation.builder() //
+        return FilePublishInformation.builder() //
             .productName(metaData.productName()) //
             .vendorName(metaData.vendorName()) //
             .lastEpochMicrosec(metaData.lastEpochMicrosec()) //
@@ -161,12 +160,11 @@
             .timeZoneOffset(metaData.timeZoneOffset()) //
             .name(fileData.name()) //
             .location(location) //
-            .internalLocation(localFile) //
+            .internalLocation(localFile.toString()) //
             .compression(fileData.compression()) //
             .fileFormatType(fileData.fileFormatType()) //
             .fileFormatVersion(fileData.fileFormatVersion()) //
             .changeIdentifier(fileData.messageMetaData().changeIdentifier()) //
-            .context(context) //
             .build();
     }
 
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
deleted file mode 100644
index a9973cf..0000000
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedChecker.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*-
-* ============LICENSE_START=======================================================
-*  Copyright (C) 2019 Nordix Foundation.
-*  Copyright (C) 2020 Nokia. All rights reserved.
-* ================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*      http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*
-* SPDX-License-Identifier: Apache-2.0
-* ============LICENSE_END=========================================================
-*/
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.time.Duration;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-/**
- * Bean used to check with DataRouter if a file has been published.
- *
- * @author <a href="mailto:maxime.bonneau@est.tech">Maxime Bonneau</a>
- *
- */
-public class PublishedChecker {
-
-    private static final Duration WEB_CLIENT_TIMEOUT = Duration.ofSeconds(4);
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private final AppConfig appConfig;
-
-    /**
-     * Constructor.
-     *
-     * @param appConfig The DFC configuration.
-     */
-    public PublishedChecker(AppConfig appConfig) {
-        this.appConfig = appConfig;
-    }
-
-    /**
-     * Checks with DataRouter if the given file has been published already.
-     *
-     * @param fileName the name of the file used when it is published.
-     *
-     * @return <code>true</code> if the file has been published before, <code>false</code>
-     *         otherwise.
-     * @throws DatafileTaskException if the check fails
-     */
-    public boolean isFilePublished(String fileName, String changeIdentifier, Map<String, String> contextMap)
-        throws DatafileTaskException {
-        MDC.setContextMap(contextMap);
-        PublisherConfiguration publisherConfig = resolveConfiguration(changeIdentifier);
-
-        DmaapProducerHttpClient producerClient = resolveClient(publisherConfig);
-
-        HttpGet getRequest = new HttpGet();
-        MappedDiagnosticContext.appendTraceInfo(getRequest);
-
-        try {
-            getRequest.setURI(getPublishedQueryUri(fileName, publisherConfig));
-            producerClient.addUserCredentialsToHead(getRequest);
-
-            HttpResponse response =
-                producerClient.getDmaapProducerResponseWithCustomTimeout(getRequest, WEB_CLIENT_TIMEOUT, contextMap);
-
-            logger.trace("{}", response);
-            int status = response.getStatusLine().getStatusCode();
-            HttpEntity entity = response.getEntity();
-            try (InputStream content = entity.getContent()) {
-                String body = IOUtils.toString(content);
-                return HttpStatus.SC_OK == status && !"[]".equals(body);
-            }
-        } catch (Exception e) {
-            logger.warn("Unable to check if file has been published, file: {}", fileName, e);
-            return false;
-        }
-    }
-
-    private static URI getPublishedQueryUri(String fileName, PublisherConfiguration config) throws URISyntaxException {
-        return new URIBuilder(config.logUrl()) //
-            .addParameter("type", "pub") //
-            .addParameter("filename", fileName) //
-            .build();
-    }
-
-    protected PublisherConfiguration resolveConfiguration(String changeIdentifier) throws DatafileTaskException {
-        return appConfig.getPublisherConfiguration(changeIdentifier);
-    }
-
-    protected DmaapProducerHttpClient resolveClient(PublisherConfiguration publisherConfig) {
-        return new DmaapProducerHttpClient(publisherConfig);
-    }
-}
diff --git a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index fa1757e..3507df9 100644
--- a/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -16,16 +16,22 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -42,24 +48,31 @@
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
 
 /**
- * This implements the main flow of the data file collector. Fetch file ready events from the
- * message router, fetch new files from the PNF publish these in the data router.
+ * This implements the main flow of the data file collector. Fetch file ready
+ * events from the
+ * message router, fetch new files from the PNF publish these in the data
+ * router.
  */
 @Component
 public class ScheduledTasks {
 
+    private static Gson gson = new GsonBuilder() //
+        .disableHtmlEscaping() //
+        .create(); //
+
     private static final int NUMBER_OF_WORKER_THREADS = 200;
     private static final int MAX_TASKS_FOR_POLLING = 50;
-    private static final long DATA_ROUTER_MAX_RETRIES = 5;
-    private static final Duration DATA_ROUTER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
     private static final long FILE_TRANSFER_MAX_RETRIES = 3;
     private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(5);
 
     private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
 
-    private final AppConfig applicationConfiguration;
+    private final AppConfig appConfig;
     private final AtomicInteger currentNumberOfTasks;
     private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
     private final AtomicInteger currentNumberOfSubscriptions;
@@ -74,7 +87,7 @@
      */
     @Autowired
     public ScheduledTasks(AppConfig applicationConfiguration) {
-        this.applicationConfiguration = applicationConfiguration;
+        this.appConfig = applicationConfiguration;
         this.currentNumberOfTasks = counters.getCurrentNumberOfTasks();
         this.currentNumberOfSubscriptions = counters.getCurrentNumberOfSubscriptions();
     }
@@ -92,7 +105,7 @@
                     threadPoolQueueSize.get());
                 return;
             }
-            if (this.applicationConfiguration.getDmaapConsumerConfiguration() == null) {
+            if (this.appConfig.getDmaapConsumerConfiguration() == null) {
                 logger.warn("No configuration loaded, skipping polling for messages");
                 return;
             }
@@ -124,16 +137,55 @@
             .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
             .flatMap(fileReadyMessage -> Flux.fromIterable(fileReadyMessage.files())) //
             .flatMap(fileData -> createMdcContext(fileData, context)) //
-            .filter(this::isFeedConfigured) //
-            .filter(this::shouldBePublished) //
             .doOnNext(fileData -> currentNumberOfTasks.incrementAndGet()) //
+            .filter(fileData -> !fileExists(fileData)) //
             .flatMap(this::fetchFile, false, 1, 1) //
-            .flatMap(this::publishToDataRouter, false, 1, 1) //
-            .doOnNext(publishInfo -> deleteFile(publishInfo.getInternalLocation(), publishInfo.getContext())) //
+            .doOnNext(this::reportFetchedFile) //
             .doOnNext(publishInfo -> currentNumberOfTasks.decrementAndGet()) //
             .sequential();
     }
 
+    private void reportFetchedFile(FilePublishInformation fileData) {
+        String json = gson.toJson(fileData);
+        if (this.appConfig.collectedFileTopic != null) {
+            sendDataToStream(Flux.just(senderRecord(fileData.getSourceName(), json)));
+        }
+    }
+
+    private boolean fileExists(FileDataWithContext fileData) {
+        Path path = fileData.fileData.getLocalFilePath();
+        return path.toFile().exists();
+    }
+
+    private SenderRecord<String, String, Integer> senderRecord(String key, String value) {
+        int correlationMetadata = 2;
+
+        return SenderRecord.create(new ProducerRecord<>(this.appConfig.collectedFileTopic, key, value),
+            correlationMetadata);
+    }
+
+    private void sendDataToStream(Flux<SenderRecord<String, String, Integer>> dataToSend) {
+        final KafkaSender<String, String> sender = KafkaSender.create(senderOptions());
+
+        sender.send(dataToSend) //
+            .doOnError(e -> logger.error("Send to kafka failed", e)) //
+            .blockLast();
+
+        sender.close();
+    }
+
+    private SenderOptions<String, String> senderOptions() {
+        String bootstrapServers = this.appConfig.getKafkaBootStrapServers();
+
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+        props.put(ProducerConfig.ACKS_CONFIG, "all");
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return SenderOptions.create(props);
+    }
+
     private class FileDataWithContext {
         public final FileData fileData;
         public final Map<String, String> context;
@@ -151,24 +203,16 @@
         publishedFilesCache.purge(now);
     }
 
-    protected PublishedChecker createPublishedChecker() {
-        return new PublishedChecker(applicationConfiguration);
-    }
-
     public Counters getCounters() {
         return this.counters;
     }
 
     protected DMaaPMessageConsumer createConsumerTask() {
-        return new DMaaPMessageConsumer(this.applicationConfiguration);
+        return new DMaaPMessageConsumer(this.appConfig);
     }
 
     protected FileCollector createFileCollector() {
-        return new FileCollector(applicationConfiguration, counters);
-    }
-
-    protected DataRouterPublisher createDataRouterPublisher() {
-        return new DataRouterPublisher(applicationConfiguration, counters);
+        return new FileCollector(appConfig, counters);
     }
 
     private static void onComplete(Map<String, String> contextMap) {
@@ -193,7 +237,6 @@
     }
 
     private static synchronized void onSuccess(FilePublishInformation publishInfo) {
-        MDC.setContextMap(publishInfo.getContext());
         logger.info("Datafile file published {}", publishInfo.getInternalLocation());
     }
 
@@ -209,42 +252,9 @@
         return Mono.just(pair);
     }
 
-    private boolean isFeedConfigured(FileDataWithContext fileData) {
-        if (applicationConfiguration.isFeedConfigured(fileData.fileData.messageMetaData().changeIdentifier())) {
-            return true;
-        } else {
-            logger.info("No feed is configured for: {}, file ignored: {}",
-                fileData.fileData.messageMetaData().changeIdentifier(), fileData.fileData.name());
-            return false;
-        }
-    }
-
-    private boolean shouldBePublished(FileDataWithContext fileData) {
-        Path localFilePath = fileData.fileData.getLocalFilePath();
-        boolean shouldBePublished = (publishedFilesCache.put(localFilePath) == null);
-        if (shouldBePublished) {
-            shouldBePublished = checkIfFileIsNotPublishedInDataRouter(fileData);
-        }
-
-        if (!shouldBePublished) {
-            logger.debug("File: {} is being processed or was already published. Skipping.", fileData.fileData.name());
-        }
-        return shouldBePublished;
-    }
-
-    private boolean checkIfFileIsNotPublishedInDataRouter(FileDataWithContext fileData) {
-        boolean isNotPublished = true;
-        try {
-            isNotPublished = !createPublishedChecker().isFilePublished(fileData.fileData.name(),
-                    fileData.fileData.messageMetaData().changeIdentifier(), fileData.context);
-        } catch (DatafileTaskException e) {
-            logger.error("Cannot check if a file {} is published", fileData.fileData.name(), e);
-        }
-        return isNotPublished;
-    }
-
     private Mono<FilePublishInformation> fetchFile(FileDataWithContext fileData) {
         MDC.setContextMap(fileData.context);
+
         return createFileCollector() //
             .collectFile(fileData.fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT,
                 fileData.context) //
@@ -266,27 +276,9 @@
         return Mono.empty();
     }
 
-    private Mono<FilePublishInformation> publishToDataRouter(FilePublishInformation publishInfo) {
-        MDC.setContextMap(publishInfo.getContext());
-
-        return createDataRouterPublisher()
-            .publishFile(publishInfo, DATA_ROUTER_MAX_RETRIES, DATA_ROUTER_INITIAL_RETRY_TIMEOUT)
-            .onErrorResume(exception -> handlePublishFailure(publishInfo));
-    }
-
-    private Mono<FilePublishInformation> handlePublishFailure(FilePublishInformation publishInfo) {
-        MDC.setContextMap(publishInfo.getContext());
-        logger.error("File publishing failed: {}", publishInfo);
-        Path internalFileName = publishInfo.getInternalLocation();
-        deleteFile(internalFileName, publishInfo.getContext());
-        publishedFilesCache.remove(internalFileName);
-        currentNumberOfTasks.decrementAndGet();
-        counters.incNoOfFailedPublish();
-        return Mono.empty();
-    }
-
     /**
-     * Fetch more messages from the message router. This is done in a polling/blocking fashion.
+     * Fetch more messages from the message router. This is done in a
+     * polling/blocking fashion.
      */
     Flux<FileReadyMessage> fetchMoreFileReadyMessages() {
         logger.info(
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 3c94029..8cf188b 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -17,38 +17,6 @@
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonIOException;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
-import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.Properties;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -60,10 +28,46 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
 /**
  * Tests the AppConfig.
  *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on
+ *         4/9/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
 class AppConfigTest {
@@ -90,9 +94,7 @@
             .keyPasswordPath("/src/test/resources/dfc.jks.pass") //
             .trustedCa("/src/test/resources/cert.jks") //
             .trustedCaPasswordPath("/src/test/resources/cert.jks.pass") //
-            .httpsHostnameVerify(true)
-            .enableCertAuth(true)
-            .build();
+            .httpsHostnameVerify(true).enableCertAuth(true).build();
 
     private AppConfig appConfigUnderTest;
     private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
@@ -124,9 +126,7 @@
         assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
 
         CertificateConfig certificateConfig = appConfigUnderTest.getCertificateConfiguration();
-        assertThat(certificateConfig)
-            .isNotNull()
-            .isEqualToComparingFieldByField(CORRECT_CERTIFICATE_CONFIGURATION);
+        assertThat(certificateConfig).isNotNull().isEqualToComparingFieldByField(CORRECT_CERTIFICATE_CONFIGURATION);
     }
 
     @Test
@@ -164,7 +164,7 @@
     @Test
     void whenFileIsNotExist_ThrowException() throws DatafileTaskException {
         // Given
-        appConfigUnderTest.setFilepath("/temp.json");
+        appConfigUnderTest.filepath = ("/temp.json");
 
         // When
         appConfigUnderTest.loadConfigurationFromFile();
@@ -285,11 +285,11 @@
 
     private void checkCorrectConsumerConfiguration(ConsumerConfiguration consumerConfiguration) {
         MessageRouterSubscribeRequest messageRouterSubscribeRequest =
-                consumerConfiguration.getMessageRouterSubscribeRequest();
+            consumerConfiguration.getMessageRouterSubscribeRequest();
         assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDcae-c12");
         assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("C12");
         assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl())
-                .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
+            .isEqualTo("http://localhost:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
         SecurityKeys securityKeys = consumerConfiguration.getMessageRouterSubscriberConfig().securityKeys();
         assertThat(securityKeys.keyStore().path().toString()).hasToString("src/test/resources/cert.jks");
         assertThat(securityKeys.trustStore().path().toString()).hasToString("src/test/resources/trust.jks");
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index e7ef7d7..57c7e8a 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -22,9 +22,11 @@
 
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
@@ -32,11 +34,13 @@
 
     public static final String CONFIG_TLS_JSON = "src/test/resources/datafile_test_config_tls.json";
     public static final String CONFIG_NO_TLS_JSON = "src/test/resources/datafile_test_config_no_tls.json";
-    public static final String INCORRECT_CERT_CONFIG_JSON = "src/test/resources/datafile_test_config_incorrect_cert_config.json";
-    public static final String EXPECTED_EXCEPTION_MESSAGE = "Wrong configuration. External certificate enabled but configs are missing: Could not find member: dmaap.certificateConfig.keyCert";
+    public static final String INCORRECT_CERT_CONFIG_JSON =
+        "src/test/resources/datafile_test_config_incorrect_cert_config.json";
+    public static final String EXPECTED_EXCEPTION_MESSAGE =
+        "Wrong configuration. External certificate enabled but configs are missing: Could not find member: dmaap.certificateConfig.keyCert";
 
     @Test
-    public void shouldCorrectReadCertificateConfigWithTLS () throws IOException, DatafileTaskException {
+    public void shouldCorrectReadCertificateConfigWithTLS() throws IOException, DatafileTaskException {
 
         CloudConfigParser parser = getCloudConfigParser(CONFIG_TLS_JSON);
         CertificateConfig certificateConfig = parser.getCertificateConfig();
@@ -50,7 +54,7 @@
     }
 
     @Test
-    public void shouldCorrectReadCertificateConfigWithoutTLS () throws IOException, DatafileTaskException {
+    public void shouldCorrectReadCertificateConfigWithoutTLS() throws IOException, DatafileTaskException {
         CloudConfigParser parser = getCloudConfigParser(CONFIG_NO_TLS_JSON);
         CertificateConfig certificateConfig = parser.getCertificateConfig();
 
@@ -62,7 +66,7 @@
     }
 
     @Test
-    public void shouldThrowExceptionWhenCertAuthIsEnabledButPathsPropertyIsMissing () throws IOException {
+    public void shouldThrowExceptionWhenCertAuthIsEnabledButPathsPropertyIsMissing() throws IOException {
         CloudConfigParser parser = getCloudConfigParser(INCORRECT_CERT_CONFIG_JSON);
 
         DatafileTaskException exception = assertThrows(DatafileTaskException.class, parser::getCertificateConfig);
@@ -73,6 +77,6 @@
         String jsonStr = Files.readString(Path.of(configPath));
         JsonObject jsonObject = JsonParser.parseString(jsonStr).getAsJsonObject();
 
-        return new CloudConfigParser(jsonObject,null);
+        return new CloudConfigParser(jsonObject, null);
     }
 }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
index cca563f..b8449bb 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
@@ -21,8 +21,8 @@
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
index b981619..88ad8cc 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleControllerTest.java
@@ -119,7 +119,7 @@
         assertEquals("ENTRY", logAppender.list.get(0).getMarker().getName());
         assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
         assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
-        assertTrue(logAppender.list.toString().contains("[INFO] " + infoMessage),"Info missing in log");
+        assertTrue(logAppender.list.toString().contains("[INFO] " + infoMessage), "Info missing in log");
         assertEquals("EXIT", logAppender.list.get(1).getMarker().getName());
         logAppender.stop();
     }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
index fbff041..f898d13 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controllers/StatusControllerTest.java
@@ -93,7 +93,7 @@
         assertEquals("ENTRY", logAppender.list.get(0).getMarker().getName());
         assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID"));
         assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID"));
-        assertTrue(logAppender.list.toString().contains("[INFO] Heartbeat request"),"Info missing in log");
+        assertTrue(logAppender.list.toString().contains("[INFO] Heartbeat request"), "Info missing in log");
         assertEquals("EXIT", logAppender.list.get(1).getMarker().getName());
         logAppender.stop();
     }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
index 2b8df34..44ea44b 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpClientTest.java
@@ -15,24 +15,6 @@
  */
 package org.onap.dcaegen2.collectors.datafile.http;
 
-import org.apache.hc.core5.net.URIBuilder;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import reactor.core.publisher.Flux;
-import reactor.netty.http.client.HttpClientConfig;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
-
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -44,6 +26,25 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+
+import org.apache.hc.core5.net.URIBuilder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.netty.http.client.HttpClientConfig;
+
 @ExtendWith(MockitoExtension.class)
 class DfcHttpClientTest {
 
@@ -73,11 +74,8 @@
 
     @Test
     void openConnection_failedBasicAuthSetupThrowException() {
-        ImmutableFileServerData serverData = ImmutableFileServerData.builder()
-            .serverAddress(XNF_ADDRESS)
-            .userId(USERNAME).password("")
-            .port(PORT)
-            .build();
+        ImmutableFileServerData serverData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
+            .userId(USERNAME).password("").port(PORT).build();
 
         DfcHttpClient dfcHttpClientSpy = spy(new DfcHttpClient(serverData));
 
@@ -85,7 +83,6 @@
             .hasMessageContaining("Not sufficient basic auth data for file.");
     }
 
-
     @Test
     void collectFile_AllOk() throws Exception {
         String REMOTE_FILE = "any";
@@ -149,22 +146,14 @@
     }
 
     private ImmutableFileServerData createFileServerData() {
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD)
-                .port(PORT)
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD)
+            .port(PORT).build();
     }
 
     private ImmutableFileServerData fileServerDataWithJWTToken() throws URISyntaxException {
         String query = "?" + ACCESS_TOKEN + "=" + JWT_PASSWORD;
 
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId("")
-                .password("")
-                .port(PORT)
-                .queryParameters(new URIBuilder(query).getQueryParams())
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+            .queryParameters(new URIBuilder(query).getQueryParams()).build();
     }
 }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClientTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClientTest.java
index 168bb08..c1e8c2e 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClientTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/DfcHttpsClientTest.java
@@ -15,6 +15,21 @@
  */
 package org.onap.dcaegen2.collectors.datafile.http;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
 
 import org.apache.hc.core5.net.URIBuilder;
 import org.apache.http.HttpResponse;
@@ -31,22 +46,6 @@
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.nio.file.Path;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
 @ExtendWith(MockitoExtension.class)
 class DfcHttpsClientTest {
 
@@ -72,7 +71,7 @@
 
     @Test
     void fileServerData_properLocationBasicAuth() throws Exception {
-        boolean result  = dfcHttpsClientSpy.basicAuthValidNotPresentOrThrow();
+        boolean result = dfcHttpsClientSpy.basicAuthValidNotPresentOrThrow();
         assertEquals(true, result);
     }
 
@@ -80,7 +79,7 @@
     void fileServerData_properLocationNoBasicAuth() throws Exception {
         dfcHttpsClientSpy = spy(new DfcHttpsClient(emptyUserInFileServerData(), connectionManager));
 
-        boolean result  = dfcHttpsClientSpy.basicAuthValidNotPresentOrThrow();
+        boolean result = dfcHttpsClientSpy.basicAuthValidNotPresentOrThrow();
         assertEquals(false, result);
     }
 
@@ -95,19 +94,16 @@
     void dfcHttpsClient_flow_successfulCallAndResponseProcessing() throws Exception {
         doReturn(HttpClientResponseHelper.APACHE_RESPONSE_OK).when(dfcHttpsClientSpy)
             .executeHttpClient(any(HttpGet.class));
-        doReturn((long)3).when(dfcHttpsClientSpy).writeFile(eq(localFile), any(InputStream.class));
+        doReturn((long) 3).when(dfcHttpsClientSpy).writeFile(eq(localFile), any(InputStream.class));
 
         dfcHttpsClientSpy.open();
         dfcHttpsClientSpy.collectFile(remoteFile, localFile);
         dfcHttpsClientSpy.close();
 
         verify(dfcHttpsClientSpy, times(1)).makeCall(any(HttpGet.class));
-        verify(dfcHttpsClientSpy, times(1))
-            .executeHttpClient(any(HttpGet.class));
-        verify(dfcHttpsClientSpy, times(1))
-            .processResponse(HttpClientResponseHelper.APACHE_RESPONSE_OK, localFile);
-        verify(dfcHttpsClientSpy, times(1))
-            .writeFile(eq(localFile), any(InputStream.class));
+        verify(dfcHttpsClientSpy, times(1)).executeHttpClient(any(HttpGet.class));
+        verify(dfcHttpsClientSpy, times(1)).processResponse(HttpClientResponseHelper.APACHE_RESPONSE_OK, localFile);
+        verify(dfcHttpsClientSpy, times(1)).writeFile(eq(localFile), any(InputStream.class));
     }
 
     @Test
@@ -116,20 +112,17 @@
         dfcHttpsClientSpy = spy(new DfcHttpsClient(serverData, connectionManager));
 
         doReturn(HttpClientResponseHelper.APACHE_RESPONSE_OK).when(dfcHttpsClientSpy)
-                .executeHttpClient(any(HttpGet.class));
-        doReturn((long)3).when(dfcHttpsClientSpy).writeFile(eq(localFile), any(InputStream.class));
+            .executeHttpClient(any(HttpGet.class));
+        doReturn((long) 3).when(dfcHttpsClientSpy).writeFile(eq(localFile), any(InputStream.class));
 
         dfcHttpsClientSpy.open();
         dfcHttpsClientSpy.collectFile(remoteFile, localFile);
         dfcHttpsClientSpy.close();
 
         verify(dfcHttpsClientSpy, times(1)).makeCall(any(HttpGet.class));
-        verify(dfcHttpsClientSpy, times(1))
-                .executeHttpClient(any(HttpGet.class));
-        verify(dfcHttpsClientSpy, times(1))
-                .processResponse(HttpClientResponseHelper.APACHE_RESPONSE_OK, localFile);
-        verify(dfcHttpsClientSpy, times(1))
-                .writeFile(eq(localFile), any(InputStream.class));
+        verify(dfcHttpsClientSpy, times(1)).executeHttpClient(any(HttpGet.class));
+        verify(dfcHttpsClientSpy, times(1)).processResponse(HttpClientResponseHelper.APACHE_RESPONSE_OK, localFile);
+        verify(dfcHttpsClientSpy, times(1)).writeFile(eq(localFile), any(InputStream.class));
         assertFalse(serverData.toString().contains(JWT_PASSWORD));
     }
 
@@ -141,64 +134,46 @@
 
         dfcHttpsClientSpy.open();
 
-        assertThrows(DatafileTaskException.class,
-                () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
+        assertThrows(DatafileTaskException.class, () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
     }
 
     @Test
     void dfcHttpsClient_flow_failedCallConnectionTimeout() throws Exception {
-        doThrow(ConnectTimeoutException.class).when(dfcHttpsClientSpy)
-            .executeHttpClient(any(HttpGet.class));
+        doThrow(ConnectTimeoutException.class).when(dfcHttpsClientSpy).executeHttpClient(any(HttpGet.class));
 
         dfcHttpsClientSpy.open();
 
         assertThrows(NonRetryableDatafileTaskException.class,
-                () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
+            () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
     }
 
     @Test
     void dfcHttpsClient_flow_failedCallIOExceptionForExecuteHttpClient() throws Exception {
-        doThrow(IOException.class).when(dfcHttpsClientSpy)
-            .executeHttpClient(any(HttpGet.class));
+        doThrow(IOException.class).when(dfcHttpsClientSpy).executeHttpClient(any(HttpGet.class));
 
         dfcHttpsClientSpy.open();
 
-        assertThrows(DatafileTaskException.class,
-                () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
+        assertThrows(DatafileTaskException.class, () -> dfcHttpsClientSpy.collectFile(remoteFile, localFile));
     }
 
     private ImmutableFileServerData createFileServerData() {
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD)
-                .port(PORT)
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD)
+            .port(PORT).build();
     }
 
     private ImmutableFileServerData emptyUserInFileServerData() {
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId("").password("")
-                .port(PORT)
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT).build();
     }
 
     private ImmutableFileServerData invalidUserInFileServerData() {
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password("")
-                .port(PORT)
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password("").port(PORT)
+            .build();
     }
 
     private ImmutableFileServerData jWTTokenInFileServerData() throws URISyntaxException {
         String query = "?" + ACCESS_TOKEN + "=" + JWT_PASSWORD;
 
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId("").password("")
-                .port(PORT)
-                .queryParameters(new URIBuilder(query).getQueryParams())
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+            .queryParameters(new URIBuilder(query).getQueryParams()).build();
     }
 }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
index 22067d0..92fecda 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpClientResponseHelper.java
@@ -20,16 +20,6 @@
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.cookie.Cookie;
-import org.apache.http.Header;
-import org.apache.http.HeaderIterator;
-import org.apache.http.HttpEntity;
-import org.apache.http.ProtocolVersion;
-import org.apache.http.StatusLine;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.params.HttpParams;
-import reactor.netty.http.client.HttpClientResponse;
-import reactor.util.context.Context;
-import reactor.util.context.ContextView;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -39,6 +29,18 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.http.Header;
+import org.apache.http.HeaderIterator;
+import org.apache.http.HttpEntity;
+import org.apache.http.ProtocolVersion;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.params.HttpParams;
+
+import reactor.netty.http.client.HttpClientResponse;
+import reactor.util.context.Context;
+import reactor.util.context.ContextView;
+
 public class HttpClientResponseHelper {
 
     public static final HttpClientResponse NETTY_RESPONSE_OK = new HttpClientResponse() {
@@ -166,193 +168,240 @@
             return null;
         }
 
-        @Override public Context currentContext() {
+        @Override
+        public Context currentContext() {
             return null;
         }
 
-        @Override public ContextView currentContextView() {
+        @Override
+        public ContextView currentContextView() {
             return null;
         }
 
-        @Override public String[] redirectedFrom() {
+        @Override
+        public String[] redirectedFrom() {
             return new String[0];
         }
 
-        @Override public HttpHeaders requestHeaders() {
+        @Override
+        public HttpHeaders requestHeaders() {
             return null;
         }
 
-        @Override public String resourceUrl() {
+        @Override
+        public String resourceUrl() {
             return null;
         }
 
-        @Override public HttpHeaders responseHeaders() {
+        @Override
+        public HttpHeaders responseHeaders() {
             return null;
         }
 
-        @Override public HttpResponseStatus status() {
+        @Override
+        public HttpResponseStatus status() {
             return HttpResponseStatus.NOT_IMPLEMENTED;
         }
     };
 
     public static final CloseableHttpResponse APACHE_RESPONSE_OK = new CloseableHttpResponse() {
-        @Override public void close() throws IOException {
+        @Override
+        public void close() throws IOException {
             getEntity().getContent().close();
         }
 
-        @Override public StatusLine getStatusLine() {
+        @Override
+        public StatusLine getStatusLine() {
             return new StatusLine() {
-                @Override public ProtocolVersion getProtocolVersion() {
+                @Override
+                public ProtocolVersion getProtocolVersion() {
                     return null;
                 }
 
-                @Override public int getStatusCode() {
+                @Override
+                public int getStatusCode() {
                     return 200;
                 }
 
-                @Override public String getReasonPhrase() {
+                @Override
+                public String getReasonPhrase() {
                     return null;
                 }
             };
         }
 
-        @Override public void setStatusLine(StatusLine statusLine) {
+        @Override
+        public void setStatusLine(StatusLine statusLine) {
 
         }
 
-        @Override public void setStatusLine(ProtocolVersion protocolVersion, int i) {
+        @Override
+        public void setStatusLine(ProtocolVersion protocolVersion, int i) {
 
         }
 
-        @Override public void setStatusLine(ProtocolVersion protocolVersion, int i, String s) {
+        @Override
+        public void setStatusLine(ProtocolVersion protocolVersion, int i, String s) {
 
         }
 
-        @Override public void setStatusCode(int i) throws IllegalStateException {
+        @Override
+        public void setStatusCode(int i) throws IllegalStateException {
 
         }
 
-        @Override public void setReasonPhrase(String s) throws IllegalStateException {
+        @Override
+        public void setReasonPhrase(String s) throws IllegalStateException {
 
         }
 
-        @Override public HttpEntity getEntity() {
+        @Override
+        public HttpEntity getEntity() {
             return new HttpEntity() {
-                @Override public boolean isRepeatable() {
+                @Override
+                public boolean isRepeatable() {
                     return false;
                 }
 
-                @Override public boolean isChunked() {
+                @Override
+                public boolean isChunked() {
                     return false;
                 }
 
-                @Override public long getContentLength() {
+                @Override
+                public long getContentLength() {
                     return 0;
                 }
 
-                @Override public Header getContentType() {
+                @Override
+                public Header getContentType() {
                     return null;
                 }
 
-                @Override public Header getContentEncoding() {
+                @Override
+                public Header getContentEncoding() {
                     return null;
                 }
 
-                @Override public InputStream getContent() throws IOException, UnsupportedOperationException {
+                @Override
+                public InputStream getContent() throws IOException, UnsupportedOperationException {
                     return new ByteArrayInputStream("abc".getBytes());
                 }
 
-                @Override public void writeTo(OutputStream outputStream) throws IOException {
+                @Override
+                public void writeTo(OutputStream outputStream) throws IOException {
 
                 }
 
-                @Override public boolean isStreaming() {
+                @Override
+                public boolean isStreaming() {
                     return false;
                 }
 
-                @Override public void consumeContent() throws IOException {
+                @Override
+                public void consumeContent() throws IOException {
 
                 }
             };
         }
 
-        @Override public void setEntity(HttpEntity httpEntity) {
+        @Override
+        public void setEntity(HttpEntity httpEntity) {
 
         }
 
-        @Override public Locale getLocale() {
+        @Override
+        public Locale getLocale() {
             return null;
         }
 
-        @Override public void setLocale(Locale locale) {
+        @Override
+        public void setLocale(Locale locale) {
 
         }
 
-        @Override public ProtocolVersion getProtocolVersion() {
+        @Override
+        public ProtocolVersion getProtocolVersion() {
             return null;
         }
 
-        @Override public boolean containsHeader(String s) {
+        @Override
+        public boolean containsHeader(String s) {
             return false;
         }
 
-        @Override public Header[] getHeaders(String s) {
+        @Override
+        public Header[] getHeaders(String s) {
             return new Header[0];
         }
 
-        @Override public Header getFirstHeader(String s) {
+        @Override
+        public Header getFirstHeader(String s) {
             return null;
         }
 
-        @Override public Header getLastHeader(String s) {
+        @Override
+        public Header getLastHeader(String s) {
             return null;
         }
 
-        @Override public Header[] getAllHeaders() {
+        @Override
+        public Header[] getAllHeaders() {
             return new Header[0];
         }
 
-        @Override public void addHeader(Header header) {
+        @Override
+        public void addHeader(Header header) {
 
         }
 
-        @Override public void addHeader(String s, String s1) {
+        @Override
+        public void addHeader(String s, String s1) {
 
         }
 
-        @Override public void setHeader(Header header) {
+        @Override
+        public void setHeader(Header header) {
 
         }
 
-        @Override public void setHeader(String s, String s1) {
+        @Override
+        public void setHeader(String s, String s1) {
 
         }
 
-        @Override public void setHeaders(Header[] headers) {
+        @Override
+        public void setHeaders(Header[] headers) {
 
         }
 
-        @Override public void removeHeader(Header header) {
+        @Override
+        public void removeHeader(Header header) {
 
         }
 
-        @Override public void removeHeaders(String s) {
+        @Override
+        public void removeHeaders(String s) {
 
         }
 
-        @Override public HeaderIterator headerIterator() {
+        @Override
+        public HeaderIterator headerIterator() {
             return null;
         }
 
-        @Override public HeaderIterator headerIterator(String s) {
+        @Override
+        public HeaderIterator headerIterator(String s) {
             return null;
         }
 
-        @Override public HttpParams getParams() {
+        @Override
+        public HttpParams getParams() {
             return null;
         }
 
-        @Override public void setParams(HttpParams httpParams) {
+        @Override
+        public void setParams(HttpParams httpParams) {
 
         }
     };
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtilTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtilTest.java
index 6328dd3..bb1a93f 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtilTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/http/HttpsClientConnectionManagerUtilTest.java
@@ -15,15 +15,14 @@
  */
 package org.onap.dcaegen2.collectors.datafile.http;
 
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 import org.junit.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
 @ExtendWith(MockitoExtension.class)
 public class HttpsClientConnectionManagerUtilTest {
 
@@ -41,14 +40,14 @@
     @Test
     public void creatingManager_successfulCase() throws Exception {
         HttpsClientConnectionManagerUtil.setupOrUpdate(KEY_PATH, KEY_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD, //
-                true);
+            true);
         assertNotNull(HttpsClientConnectionManagerUtil.instance());
     }
 
     @Test
     public void creatingManager_improperSecretShouldThrowException() {
         assertThrows(DatafileTaskException.class, () -> HttpsClientConnectionManagerUtil.setupOrUpdate(KEY_PATH, //
-                KEY_IMPROPER_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD, true));
+            KEY_IMPROPER_PASSWORD, TRUSTED_CA_PATH, TRUSTED_CA_PASSWORD, true));
         assertThrows(DatafileTaskException.class, () -> HttpsClientConnectionManagerUtil.instance());
     }
 
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
index a446050..ea70f43 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -19,9 +19,9 @@
 
 package org.onap.dcaegen2.collectors.datafile.model;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.commons.FileServerData;
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java
index 61adef9..413cd13 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/scheme/SchemeTest.java
@@ -22,10 +22,9 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 public class SchemeTest {
 
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
index 953f322..4dcd771 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
@@ -17,17 +17,17 @@
 
 package org.onap.dcaegen2.collectors.datafile.service;
 
-import org.apache.hc.core5.http.NameValuePair;
-import org.apache.hc.core5.net.URIBuilder;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.net.URISyntaxException;
 import java.util.List;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.net.URIBuilder;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.commons.ImmutableFileServerData;
 
 class HttpUtilsTest {
 
@@ -77,10 +77,8 @@
 
     @Test
     void prepareUri_UriWithoutPort() {
-        ImmutableFileServerData serverData = ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD)
-                .build();
+        ImmutableFileServerData serverData =
+            ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId(USERNAME).password(PASSWORD).build();
         String REMOTE_FILE = "any";
 
         String retrievedUri = HttpUtils.prepareUri("http", serverData, REMOTE_FILE, 80);
@@ -90,10 +88,10 @@
     @Test
     void prepareUri_verifyUriWithTokenAndFragment() throws URISyntaxException {
         String file = "/file";
-        String expected = "http://" + XNF_ADDRESS + ":" + PORT + file + "?"
-            + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&" + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&"
-            + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "#" + FRAGMENT;
-        assertEquals(expected, HttpUtils.prepareUri("http", fileServerDataWithJWTTokenLongQueryAndFragment(), file, 443));
+        String expected = "http://" + XNF_ADDRESS + ":" + PORT + file + "?" + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&"
+            + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "&" + ANOTHER_TOKEN + "=" + ANOTHER_DATA + "#" + FRAGMENT;
+        assertEquals(expected,
+            HttpUtils.prepareUri("http", fileServerDataWithJWTTokenLongQueryAndFragment(), file, 443));
     }
 
     @Test
@@ -131,13 +129,8 @@
     private ImmutableFileServerData fileServerDataWithJWTToken() throws URISyntaxException {
         String query = "?" + ACCESS_TOKEN + "=" + JWT_PASSWORD;
 
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId("")
-                .password("")
-                .port(PORT)
-                .queryParameters(new URIBuilder(query).getQueryParams())
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+            .queryParameters(new URIBuilder(query).getQueryParams()).build();
     }
 
     private ImmutableFileServerData fileServerDataWithJWTTokenLongQueryAndFragment() throws URISyntaxException {
@@ -147,37 +140,20 @@
         query.append(ACCESS_TOKEN + "=" + JWT_PASSWORD + "&");
         query.append(ANOTHER_TOKEN + "=" + ANOTHER_DATA);
 
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId("")
-                .password("")
-                .port(PORT)
-                .queryParameters(new URIBuilder(query.toString()).getQueryParams())
-                .uriRawFragment(FRAGMENT)
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+            .queryParameters(new URIBuilder(query.toString()).getQueryParams()).uriRawFragment(FRAGMENT).build();
     }
 
     private ImmutableFileServerData fileServerDataQueryWithoutToken() throws URISyntaxException {
         StringBuilder query = new StringBuilder();
         query.append("?" + ANOTHER_TOKEN + "=" + ANOTHER_DATA);
 
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId("")
-                .password("")
-                .port(PORT)
-                .queryParameters(new URIBuilder(query.toString()).getQueryParams())
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+            .queryParameters(new URIBuilder(query.toString()).getQueryParams()).build();
     }
 
     private ImmutableFileServerData fileServerDataNoTokenNoFragment() throws URISyntaxException {
-        return ImmutableFileServerData.builder()
-                .serverAddress(XNF_ADDRESS)
-                .userId("")
-                .password("")
-                .port(PORT)
-                .queryParameters(new URIBuilder("").getQueryParams())
-                .uriRawFragment("")
-                .build();
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS).userId("").password("").port(PORT)
+            .queryParameters(new URIBuilder("").getQueryParams()).uriRawFragment("").build();
     }
 }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index c7ef8da..594ea4f 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -240,11 +240,13 @@
         StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
-        assertTrue(logAppender.list.toString()
+        assertTrue(
+            logAppender.list.toString()
                 .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
-                    + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "unreal" + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE
-                    + ". Location: unreal://location.xml"),"Error missing in log");
-        assertTrue(logAppender.list.toString().contains("sourceName=5GRAN_DU"),"Missing sourceName in log");
+                    + Scheme.DFC_DOES_NOT_SUPPORT_PROTOCOL_ERROR_MSG + "unreal"
+                    + Scheme.SUPPORTED_PROTOCOLS_ERROR_MESSAGE + ". Location: unreal://location.xml"),
+            "Error missing in log");
+        assertTrue(logAppender.list.toString().contains("sourceName=5GRAN_DU"), "Missing sourceName in log");
     }
 
     @Test
@@ -324,8 +326,11 @@
         StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
 
-        assertTrue(logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
-                + "Can not get PRODUCT_NAME from eventName, eventName is not in correct format: Faulty event name"),"Error missing in log");
+        assertTrue(
+            logAppender.list.toString()
+                .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
+                    + "Can not get PRODUCT_NAME from eventName, eventName is not in correct format: Faulty event name"),
+            "Error missing in log");
     }
 
     @Test
@@ -354,10 +359,9 @@
         StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).verifyComplete();
 
-        assertTrue(logAppender.list.toString()
-                .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
-                    + "File information wrong. Missing data: [name] Data: "
-                    + message.getAdditionalFields().get(0).toString()),"Error missing in log");
+        assertTrue(logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
+            + "File information wrong. Missing data: [name] Data: " + message.getAdditionalFields().get(0).toString()),
+            "Error missing in log");
     }
 
     @Test
@@ -380,7 +384,7 @@
             .expectNextCount(0).verifyComplete();
 
         assertTrue(logAppender.list.toString().contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
-                + "Missing arrayOfNamedHashMap in message. " + message.getParsed()),"Error missing in log");
+            + "Missing arrayOfNamedHashMap in message. " + message.getParsed()), "Error missing in log");
     }
 
     @Test
@@ -410,9 +414,10 @@
             .expectNextCount(0).verifyComplete();
 
         assertTrue(logAppender.list.toString()
-                .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
-                    + "File information wrong. Missing data: [compression] Data: "
-                    + message.getAdditionalFields().get(0).toString()),"Error missing in log");
+            .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
+                + "File information wrong. Missing data: [compression] Data: "
+                + message.getAdditionalFields().get(0).toString()),
+            "Error missing in log");
     }
 
     @Test
@@ -442,9 +447,10 @@
             .expectNextCount(0).verifyComplete();
 
         assertTrue(logAppender.list.toString()
-                .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
-                    + "File information wrong. Missing data: [fileFormatType] Data: "
-                    + message.getAdditionalFields().get(0).toString()),"Error missing in log");
+            .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
+                + "File information wrong. Missing data: [fileFormatType] Data: "
+                + message.getAdditionalFields().get(0).toString()),
+            "Error missing in log");
     }
 
     @Test
@@ -522,10 +528,12 @@
         StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
 
-        assertTrue(logAppender.list.toString()
+        assertTrue(
+            logAppender.list.toString()
                 .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING
                     + "Missing data: [changeIdentifier, changeType, notificationFieldsVersion]. "
-                    + "Change type is wrong:  Expected: FileReady Message: " + message.getParsed()),"Error missing in log");
+                    + "Change type is wrong:  Expected: FileReady Message: " + message.getParsed()),
+            "Error missing in log");
     }
 
     @Test
@@ -540,8 +548,8 @@
         StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectComplete().verify();
 
-        assertTrue(logAppender.list.toString().contains(ERROR_LOG_TAG + "Incorrect JsonObject - missing header. ")
-            ,"Error missing in log");
+        assertTrue(logAppender.list.toString().contains(ERROR_LOG_TAG + "Incorrect JsonObject - missing header. "),
+            "Error missing in log");
     }
 
     @Test
@@ -570,9 +578,11 @@
         StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Flux.just(jsonElement))).expectSubscription()
             .expectNextCount(0).expectComplete().verify();
 
-        assertTrue(logAppender.list.toString()
+        assertTrue(
+            logAppender.list.toString()
                 .contains(ERROR_LOG_TAG + JsonMessageParser.ERROR_MSG_VES_EVENT_PARSING + " Change type is wrong: "
-                    + INCORRECT_CHANGE_TYPE + " Expected: FileReady Message: " + message.getParsed()),"Error missing in log");
+                    + INCORRECT_CHANGE_TYPE + " Expected: FileReady Message: " + message.getParsed()),
+            "Error missing in log");
     }
 
     @Test
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
index addd77d..c12598e 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -16,9 +16,9 @@
 
 package org.onap.dcaegen2.collectors.datafile.service.producer;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index e68913f..69b6d42 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -21,19 +21,33 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
@@ -43,24 +57,10 @@
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+
 import reactor.core.publisher.Flux;
 import reactor.test.StepVerifier;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
 public class DMaaPMessageConsumerTest {
     private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
     private static final String PRODUCT_NAME = "NrRadio";
@@ -178,7 +178,7 @@
             .messageMetaData(messageMetaData) //
             .build();
 
-        ImmutableFilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
+        FilePublishInformation filePublishInformation = FilePublishInformation.builder() //
             .productName(PRODUCT_NAME) //
             .vendorName(VENDOR_NAME) //
             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -187,12 +187,11 @@
             .timeZoneOffset(TIME_ZONE_OFFSET) //
             .name(PM_FILE_NAME) //
             .location(FTPES_LOCATION) //
-            .internalLocation(LOCAL_FILE_LOCATION) //
+            .internalLocation(LOCAL_FILE_LOCATION.toString()) //
             .compression(GZIP_COMPRESSION) //
             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
             .fileFormatVersion(FILE_FORMAT_VERSION) //
             .changeIdentifier(CHANGE_IDENTIFIER) //
-            .context(new HashMap<String, String>()) //
             .build();
         listOfFilePublishInformation.add(filePublishInformation);
 
@@ -213,7 +212,7 @@
             .verify();
 
         verify(messageRouterSubscriber, times(1))
-                .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+            .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
     }
 
     @Test
@@ -224,10 +223,8 @@
             .expectNext(expectedFtpesMessage) //
             .verifyComplete();
 
-
-
         verify(messageRouterSubscriber, times(1))
-                .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+            .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
         verifyNoMoreInteractions(messageRouterSubscriber);
     }
 
@@ -240,7 +237,7 @@
             .verifyComplete();
 
         verify(messageRouterSubscriber, times(1))
-                .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
+            .getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest());
         verifyNoMoreInteractions(messageRouterSubscriber);
     }
 
@@ -250,11 +247,11 @@
 
         messageRouterSubscriber = mock(MessageRouterSubscriber.class);
         dmaapConsumerConfiguration = new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class),
-                messageRouterSubscriber, mock(MessageRouterSubscribeRequest.class));
+            messageRouterSubscriber, mock(MessageRouterSubscribeRequest.class));
 
         JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
         when(messageRouterSubscriber.getElements(dmaapConsumerConfiguration.getMessageRouterSubscribeRequest()))
-                .thenReturn(messageAsMono);
+            .thenReturn(messageAsMono);
         when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
 
         if (message.isPresent()) {
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
deleted file mode 100644
index 8cf5fa9..0000000
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018-2020 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
-import java.io.File;
-import java.net.URI;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.http.Header;
-import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.model.Counters;
-import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
-import org.springframework.http.HttpStatus;
-import reactor.test.StepVerifier;
-
-/**
- * Tests the DataRouter publisher.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-class DataRouterPublisherTest {
-
-    private static final String PRODUCT_NAME = "NrRadio";
-    private static final String VENDOR_NAME = "Ericsson";
-    private static final String LAST_EPOCH_MICROSEC = "8745745764578";
-    private static final String SOURCE_NAME = "oteNB5309";
-    private static final String START_EPOCH_MICROSEC = "8745745764578";
-    private static final String TIME_ZONE_OFFSET = "UTC+05:00";
-    private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
-    private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
-    private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
-
-    private static final String COMPRESSION = "gzip";
-    private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
-    private static final String FILE_FORMAT_VERSION = "V10";
-    private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
-
-    private static final String HOST = "54.45.33.2";
-    private static final String HTTPS_SCHEME = "https";
-    private static final int PORT = 1234;
-    private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
-    private static final String PUBLISH_TOPIC = "publish";
-    private static final String FEED_ID = "1";
-
-    // "https://54.45.333.2:1234/publish/1";
-    private static final String PUBLISH_URL =
-        HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
-
-    private static FilePublishInformation filePublishInformation;
-    private static DmaapProducerHttpClient httpClientMock;
-    private static AppConfig appConfig;
-    private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
-    private static Map<String, String> context = new HashMap<>();
-    private static DataRouterPublisher publisherTaskUnderTestSpy;
-    private Counters counters;
-
-    @BeforeAll
-    public static void setUp() {
-        when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
-
-        filePublishInformation = ImmutableFilePublishInformation.builder() //
-            .productName(PRODUCT_NAME) //
-            .vendorName(VENDOR_NAME) //
-            .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
-            .sourceName(SOURCE_NAME) //
-            .startEpochMicrosec(START_EPOCH_MICROSEC) //
-            .timeZoneOffset(TIME_ZONE_OFFSET) //
-            .name(PM_FILE_NAME) //
-            .location(FTPES_ADDRESS) //
-            .internalLocation(Paths.get("target/" + PM_FILE_NAME)) //
-            .compression("gzip") //
-            .fileFormatType(FILE_FORMAT_TYPE) //
-            .fileFormatVersion(FILE_FORMAT_VERSION) //
-            .context(context) //
-            .changeIdentifier(CHANGE_IDENTIFIER) //
-            .build(); //
-        appConfig = mock(AppConfig.class);
-    }
-
-    @BeforeEach
-    void setUpTest() {
-        counters = new Counters();
-        publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, counters));
-    }
-
-    @Test
-    public void whenPassedObjectFits_ReturnsCorrectStatus() throws Exception {
-        prepareMocksForTests(null, Integer.valueOf(HttpStatus.OK.value()));
-        StepVerifier //
-            .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
-            .expectNext(filePublishInformation) //
-            .verifyComplete();
-
-        ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
-        verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
-        verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
-        verifyNoMoreInteractions(httpClientMock);
-
-        HttpPut actualPut = (HttpPut) requestCaptor.getValue();
-        URI actualUri = actualPut.getURI();
-        assertEquals(HTTPS_SCHEME, actualUri.getScheme());
-        assertEquals(HOST, actualUri.getHost());
-        assertEquals(PORT, actualUri.getPort());
-
-        Path actualPath = Paths.get(actualUri.getPath());
-        assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
-        assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
-        assertTrue(PM_FILE_NAME.equals(actualPath.getName(2).toString()));
-
-        Header[] contentHeaders = actualPut.getHeaders("content-type");
-        assertEquals(APPLICATION_OCTET_STREAM_CONTENT_TYPE, contentHeaders[0].getValue());
-
-        Header[] metaHeaders = actualPut.getHeaders(X_DMAAP_DR_META);
-        Map<String, String> metaHash = getMetaDataAsMap(metaHeaders);
-
-        assertEquals(PRODUCT_NAME, metaHash.get("productName"));
-        assertEquals(VENDOR_NAME, metaHash.get("vendorName"));
-        assertEquals(LAST_EPOCH_MICROSEC, metaHash.get("lastEpochMicrosec"));
-        assertEquals(SOURCE_NAME, metaHash.get("sourceName"));
-        assertEquals(START_EPOCH_MICROSEC, metaHash.get("startEpochMicrosec"));
-        assertEquals(TIME_ZONE_OFFSET, metaHash.get("timeZoneOffset"));
-        assertEquals(COMPRESSION, metaHash.get("compression"));
-        assertEquals(FTPES_ADDRESS, metaHash.get("location"));
-        assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
-        assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
-
-        // Note that the following line checks the number of properties that are sent to the data
-        // router.
-        // This should be 10 unless the API is updated (which is the fields checked above)
-        assertEquals(10, metaHash.size());
-
-        assertEquals(1, counters.getTotalPublishedFiles(),"totalPublishedFiles should have been 1");
-        assertEquals(0, counters.getNoOfFailedPublishAttempts(),"noOfFailedPublishAttempts should have been 0");
-    }
-
-    @Test
-    void whenPassedObjectFits_firstFailsWithExceptionThenSucceeds() throws Exception {
-        prepareMocksForTests(new DatafileTaskException("Error"), HttpStatus.OK.value());
-
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
-        StepVerifier.create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 2, Duration.ofSeconds(0)))
-            .expectNext(filePublishInformation) //
-            .verifyComplete();
-
-        assertTrue(logAppender.list.toString().contains("[WARN] Publishing file " + PM_FILE_NAME + " to DR unsuccessful.")
-            ,"Warning missing in log");
-
-        assertEquals(1, counters.getTotalPublishedFiles(),"totalPublishedFiles should have been 1");
-        assertEquals(1, counters.getNoOfFailedPublishAttempts(),"noOfFailedPublishAttempts should have been 1");
-    }
-
-    @Test
-    public void whenPassedObjectFits_firstFailsThenSucceeds() throws Exception {
-        prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
-            Integer.valueOf(HttpStatus.OK.value()));
-
-        StepVerifier //
-            .create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
-            .expectNext(filePublishInformation) //
-            .verifyComplete();
-
-        verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
-        verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
-        verifyNoMoreInteractions(httpClientMock);
-
-        assertEquals(1, counters.getTotalPublishedFiles(),"totalPublishedFiles should have been 1");
-        assertEquals(1, counters.getNoOfFailedPublishAttempts(),"noOfFailedPublishAttempts should have been 1");
-    }
-
-    @Test
-    public void whenPassedObjectFits_firstFailsThenFails() throws Exception {
-        prepareMocksForTests(null, Integer.valueOf(HttpStatus.BAD_GATEWAY.value()),
-            Integer.valueOf((HttpStatus.BAD_GATEWAY.value())));
-
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DataRouterPublisher.class);
-        StepVerifier.create(publisherTaskUnderTestSpy.publishFile(filePublishInformation, 1, Duration.ofSeconds(0)))
-            .expectErrorMessage("Retries exhausted: 1/1") //
-            .verify();
-
-        assertTrue(logAppender.list.toString().contains("[WARN] Publishing file "
-            + PM_FILE_NAME + " to DR unsuccessful. Response code: " + HttpStatus.BAD_GATEWAY),"Warning missing in log");
-
-        verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
-        verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
-        verifyNoMoreInteractions(httpClientMock);
-
-        assertEquals(0, counters.getTotalPublishedFiles(),"totalPublishedFiles should have been 0");
-        assertEquals(2, counters.getNoOfFailedPublishAttempts(),"noOfFailedPublishAttempts should have been 2");
-    }
-
-    @SafeVarargs
-    final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
-        throws Exception {
-        httpClientMock = mock(DmaapProducerHttpClient.class);
-        when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
-        doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
-        doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER);
-
-        HttpResponse httpResponseMock = mock(HttpResponse.class);
-        if (exception == null) {
-            when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
-                .thenReturn(httpResponseMock);
-        } else {
-            when(httpClientMock.getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any()))
-                .thenThrow(exception).thenReturn(httpResponseMock);
-        }
-        StatusLine statusLineMock = mock(StatusLine.class);
-        when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
-        when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses);
-
-        File file = File.createTempFile("DFC", "tmp");
-        doReturn(file).when(publisherTaskUnderTestSpy).createInputFile(Paths.get("target", PM_FILE_NAME));
-    }
-
-    private Map<String, String> getMetaDataAsMap(Header[] metaHeaders) {
-        Map<String, String> metaHash = new HashMap<>();
-        String actualMetaData = metaHeaders[0].getValue();
-        actualMetaData = actualMetaData.substring(1, actualMetaData.length() - 1);
-        actualMetaData = actualMetaData.replace("\"", "");
-        String[] commaSplitedMetaData = actualMetaData.split(",");
-        for (int i = 0; i < commaSplitedMetaData.length; i++) {
-            String[] keyValuePair = commaSplitedMetaData[i].split(":");
-            if (keyValuePair.length > 2) {
-                List<String> arrayKeyValuePair = new ArrayList<>(keyValuePair.length);
-                for (int j = 1; j < keyValuePair.length; j++) {
-                    arrayKeyValuePair.add(keyValuePair[j]);
-                }
-                keyValuePair[1] = String.join(":", arrayKeyValuePair);
-            }
-            metaHash.put(keyValuePair[0], keyValuePair[1]);
-        }
-        return metaHash;
-    }
-}
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index 917055c..b0fe0b2 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -18,7 +18,6 @@
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -38,12 +37,12 @@
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpesClient;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
 import org.onap.dcaegen2.collectors.datafile.http.DfcHttpClient;
 import org.onap.dcaegen2.collectors.datafile.http.DfcHttpsClient;
@@ -51,7 +50,6 @@
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import reactor.test.StepVerifier;
@@ -85,13 +83,13 @@
     private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
 
     private static final String HTTP_LOCATION =
-            HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+        HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
     private static final String HTTP_LOCATION_NO_PORT =
-            HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+        HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
     private static final String HTTPS_LOCATION =
-            HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+        HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
     private static final String HTTPS_LOCATION_NO_PORT =
-            HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+        HTTP_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
 
     private static final String GZIP_COMPRESSION = "gzip";
     private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -142,7 +140,7 @@
     }
 
     private FilePublishInformation createExpectedFilePublishInformation(String location) {
-        return ImmutableFilePublishInformation.builder() //
+        return FilePublishInformation.builder() //
             .productName(PRODUCT_NAME) //
             .vendorName(VENDOR_NAME) //
             .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
@@ -151,11 +149,10 @@
             .timeZoneOffset(TIME_ZONE_OFFSET) //
             .name(PM_FILE_NAME) //
             .location(location) //
-            .internalLocation(LOCAL_FILE_LOCATION) //
+            .internalLocation(LOCAL_FILE_LOCATION.toString()) //
             .compression(GZIP_COMPRESSION) //
             .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
             .fileFormatVersion(FILE_FORMAT_VERSION) //
-            .context(new HashMap<String, String>()) //
             .changeIdentifier(CHANGE_IDENTIFIER) //
             .build();
     }
@@ -193,13 +190,11 @@
         verify(ftpesClientMock, times(1)).close();
         verifyNoMoreInteractions(ftpesClientMock);
 
-        assertEquals(1, counters.getNoOfCollectedFiles(),"collectedFiles should have been 1");
-        assertEquals(0, counters.getNoOfFailedFtpAttempts(),"failedFtpAttempts should have been 0");
-        assertEquals(0, counters.getNoOfFailedHttpAttempts(),"failedHttpAttempts should have been 0");
+        assertEquals(1, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+        assertEquals(0, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 0");
+        assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
     }
 
-
-
     @Test
     public void whenSftpFile_returnCorrectResponse() throws Exception {
         FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters));
@@ -226,7 +221,7 @@
         verify(sftpClientMock, times(2)).close();
         verifyNoMoreInteractions(sftpClientMock);
 
-        assertEquals(2, counters.getNoOfCollectedFiles(),"collectedFiles should have been 2");
+        assertEquals(2, counters.getNoOfCollectedFiles(), "collectedFiles should have been 2");
     }
 
     @Test
@@ -237,28 +232,28 @@
         FileData fileData = createFileData(HTTP_LOCATION_NO_PORT, Scheme.HTTP);
 
         FilePublishInformation expectedfilePublishInformation =
-                createExpectedFilePublishInformation(HTTP_LOCATION_NO_PORT);
+            createExpectedFilePublishInformation(HTTP_LOCATION_NO_PORT);
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-                .expectNext(expectedfilePublishInformation) //
-                .verifyComplete();
+            .expectNext(expectedfilePublishInformation) //
+            .verifyComplete();
 
         // The same again, but with port
         fileData = createFileData(HTTP_LOCATION, Scheme.HTTP);
         expectedfilePublishInformation = createExpectedFilePublishInformation(HTTP_LOCATION);
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-                .expectNext(expectedfilePublishInformation) //
-                .verifyComplete();
+            .expectNext(expectedfilePublishInformation) //
+            .verifyComplete();
 
         verify(dfcHttpClientMock, times(2)).open();
         verify(dfcHttpClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verify(dfcHttpClientMock, times(2)).close();
         verifyNoMoreInteractions(dfcHttpClientMock);
 
-        assertEquals(2, counters.getNoOfCollectedFiles(),"collectedFiles should have been 1");
-        assertEquals(0, counters.getNoOfFailedFtpAttempts(),"failedFtpAttempts should have been 0");
-        assertEquals(0, counters.getNoOfFailedHttpAttempts(),"failedHttpAttempts should have been 0");
+        assertEquals(2, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+        assertEquals(0, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 0");
+        assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
     }
 
     @Test
@@ -269,28 +264,28 @@
         FileData fileData = createFileData(HTTPS_LOCATION_NO_PORT, Scheme.HTTPS);
 
         FilePublishInformation expectedfilePublishInformation =
-                createExpectedFilePublishInformation(HTTPS_LOCATION_NO_PORT);
+            createExpectedFilePublishInformation(HTTPS_LOCATION_NO_PORT);
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-                .expectNext(expectedfilePublishInformation) //
-                .verifyComplete();
+            .expectNext(expectedfilePublishInformation) //
+            .verifyComplete();
 
         // The same again, but with port
         fileData = createFileData(HTTPS_LOCATION, Scheme.HTTPS);
         expectedfilePublishInformation = createExpectedFilePublishInformation(HTTPS_LOCATION);
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-                .expectNext(expectedfilePublishInformation) //
-                .verifyComplete();
+            .expectNext(expectedfilePublishInformation) //
+            .verifyComplete();
 
         verify(dfcHttpsClientMock, times(2)).open();
         verify(dfcHttpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verify(dfcHttpsClientMock, times(2)).close();
         verifyNoMoreInteractions(dfcHttpsClientMock);
 
-        assertEquals(2, counters.getNoOfCollectedFiles(),"collectedFiles should have been 1");
-        assertEquals(0, counters.getNoOfFailedFtpAttempts(),"failedFtpAttempts should have been 0");
-        assertEquals(0, counters.getNoOfFailedHttpAttempts(),"failedHttpAttempts should have been 0");
+        assertEquals(2, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+        assertEquals(0, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 0");
+        assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
     }
 
     @Test
@@ -300,12 +295,11 @@
         FileData fileData = createFileData(HTTPS_LOCATION, Scheme.HTTPS);
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-            .expectErrorMessage("Retries exhausted: 3/3")
-            .verify();
+            .expectErrorMessage("Retries exhausted: 3/3").verify();
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-            .consumeErrorWith(throwable ->
-                assertEquals("HTTPS error: TLS connection is disabled", throwable.getCause().getMessage()))
+            .consumeErrorWith(
+                throwable -> assertEquals("HTTPS error: TLS connection is disabled", throwable.getCause().getMessage()))
             .verify();
     }
 
@@ -316,12 +310,11 @@
         FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPES);
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-            .expectErrorMessage("Retries exhausted: 3/3")
-            .verify();
+            .expectErrorMessage("Retries exhausted: 3/3").verify();
 
         StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap))
-            .consumeErrorWith(throwable ->
-                assertEquals("FTPES error: TLS connection is disabled", throwable.getCause().getMessage()))
+            .consumeErrorWith(
+                throwable -> assertEquals("FTPES error: TLS connection is disabled", throwable.getCause().getMessage()))
             .verify();
     }
 
@@ -340,9 +333,9 @@
 
         verify(ftpesClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
-        assertEquals(0, counters.getNoOfCollectedFiles(),"collectedFiles should have been 0");
-        assertEquals(4, counters.getNoOfFailedFtpAttempts(),"failedFtpAttempts should have been 4");
-        assertEquals(0, counters.getNoOfFailedHttpAttempts(),"failedHttpAttempts should have been 0");
+        assertEquals(0, counters.getNoOfCollectedFiles(), "collectedFiles should have been 0");
+        assertEquals(4, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 4");
+        assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
     }
 
     @Test
@@ -360,9 +353,9 @@
 
         verify(ftpesClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
-        assertEquals(0, counters.getNoOfCollectedFiles(),"collectedFiles should have been 0");
-        assertEquals(1, counters.getNoOfFailedFtpAttempts(),"failedFtpAttempts should have been 1");
-        assertEquals(0, counters.getNoOfFailedHttpAttempts(),"failedHttpAttempts should have been 0");
+        assertEquals(0, counters.getNoOfCollectedFiles(), "collectedFiles should have been 0");
+        assertEquals(1, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 1");
+        assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
     }
 
     @Test
@@ -383,8 +376,8 @@
 
         verify(ftpesClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
-        assertEquals(1, counters.getNoOfCollectedFiles(),"collectedFiles should have been 1");
-        assertEquals(1, counters.getNoOfFailedFtpAttempts(),"failedFtpAttempts should have been 1");
-        assertEquals(0, counters.getNoOfFailedHttpAttempts(),"failedHttpAttempts should have been 0");
+        assertEquals(1, counters.getNoOfCollectedFiles(), "collectedFiles should have been 1");
+        assertEquals(1, counters.getNoOfFailedFtpAttempts(), "failedFtpAttempts should have been 1");
+        assertEquals(0, counters.getNoOfFailedHttpAttempts(), "failedHttpAttempts should have been 0");
     }
 }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
deleted file mode 100644
index 2389c15..0000000
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*-
-* ============LICENSE_START=======================================================
-*  Copyright (C) 2019 Nordix Foundation.
-*  Copyright (C) 2020 Nokia. All rights reserved.
-* ================================================================================
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*      http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*
-* SPDX-License-Identifier: Apache-2.0
-* ============LICENSE_END=========================================================
-*/
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.StatusLine;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-
-public class PublishedCheckerTest {
-    private static final String PUBLISH_URL = "https://54.45.33.2:1234/";
-    private static final String EMPTY_CONTENT = "[]";
-    private static final String SOURCE_NAME = "oteNB5309";
-    private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
-    private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
-    private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
-    private static final String LOG_URI = "https://localhost:3907/feedlog/1";
-
-    private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
-
-    private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
-    private static AppConfig appConfigMock;
-    private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class);
-
-    private PublishedChecker publishedCheckerUnderTestSpy;
-
-    @BeforeAll
-    private static void setUp() throws DatafileTaskException {
-        when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
-
-        appConfigMock = mock(AppConfig.class);
-        when(appConfigMock.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
-    }
-
-    @Test
-    public void executeWhenNotPublished_returnsFalse() throws Exception {
-        prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
-
-        boolean isPublished =
-            publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
-
-        assertFalse(isPublished);
-
-        ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
-        verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
-        verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any());
-        verifyNoMoreInteractions(httpClientMock);
-
-        HttpUriRequest getRequest = requestCaptor.getValue();
-        assertTrue(getRequest instanceof HttpGet);
-        URI actualUri = getRequest.getURI();
-        // https://localhost:3907/feedlog/1?type=pub&filename=oteNB5309_A20161224.1030-1045.bin.gz
-        String expUri = LOG_URI + "?type=pub&filename=" + LOCAL_FILE_NAME;
-        assertEquals(expUri, actualUri.toString());
-    }
-
-    @Test
-    public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
-        prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
-
-        boolean isPublished =
-            publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
-
-        assertFalse(isPublished);
-    }
-
-    @Test
-    public void executeWhenPublished_returnsTrue() throws Exception {
-        prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
-
-        boolean isPublished =
-            publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
-
-        assertTrue(isPublished);
-    }
-
-    @Test
-    public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
-        prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
-
-        boolean isPublished =
-            publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
-
-        assertFalse(isPublished);
-    }
-
-    final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
-        publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
-
-        doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
-        doReturn(LOG_URI).when(publisherConfigurationMock).logUrl();
-        doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(publisherConfigurationMock);
-
-        HttpResponse httpResponseMock = mock(HttpResponse.class);
-        if (exception == null) {
-            when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), any(), any()))
-                .thenReturn(httpResponseMock);
-        } else {
-            when(httpClientMock.getDmaapProducerResponseWithCustomTimeout(any(HttpUriRequest.class), any(), any()))
-                .thenThrow(exception);
-        }
-        HttpEntity httpEntityMock = mock(HttpEntity.class);
-        StatusLine statusLineMock = mock(StatusLine.class);
-        when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
-        when(statusLineMock.getStatusCode()).thenReturn(responseCode);
-        when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
-        InputStream stream = new ByteArrayInputStream(content.getBytes());
-        when(httpEntityMock.getContent()).thenReturn(stream);
-    }
-}
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 85f52ff..45ba420 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -27,7 +27,6 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -50,17 +49,16 @@
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
@@ -85,9 +83,7 @@
 
     private int uniqueValue = 0;
     private DMaaPMessageConsumer consumerMock;
-    private PublishedChecker publishedCheckerMock;
     private FileCollector fileCollectorMock;
-    private DataRouterPublisher dataRouterMock;
     private Map<String, String> contextMap = new HashMap<String, String>();
 
     private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
@@ -97,14 +93,11 @@
         testedObject = spy(new ScheduledTasks(appConfig));
 
         consumerMock = mock(DMaaPMessageConsumer.class);
-        publishedCheckerMock = mock(PublishedChecker.class);
+
         fileCollectorMock = mock(FileCollector.class);
-        dataRouterMock = mock(DataRouterPublisher.class);
 
         doReturn(consumerMock).when(testedObject).createConsumerTask();
-        doReturn(publishedCheckerMock).when(testedObject).createPublishedChecker();
         doReturn(fileCollectorMock).when(testedObject).createFileCollector();
-        doReturn(dataRouterMock).when(testedObject).createDataRouterPublisher();
     }
 
     private void setUpConfiguration() throws DatafileTaskException {
@@ -124,10 +117,8 @@
             new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
                 mock(MessageRouterSubscribeRequest.class));
 
-
         doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
         doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
-        doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
     }
 
     private MessageMetaData messageMetaData() {
@@ -179,7 +170,7 @@
     }
 
     private FilePublishInformation filePublishInformation() {
-        return ImmutableFilePublishInformation //
+        return FilePublishInformation //
             .builder() //
             .productName("") //
             .vendorName("") //
@@ -189,12 +180,12 @@
             .timeZoneOffset("") //
             .name("") //
             .location("") //
-            .internalLocation(Paths.get("internalLocation")) //
+            .internalLocation("internalLocation") //
             .compression("") //
             .fileFormatType("") //
             .fileFormatVersion("") //
             .changeIdentifier(CHANGE_IDENTIFIER) //
-            .context(new HashMap<String, String>()).build();
+            .build();
     }
 
     @Test
@@ -239,60 +230,16 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
 
-        doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any(), any());
-
         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
 
         testedObject.executeDatafileMainTask();
 
-        await().untilAsserted(() -> assertEquals(0,
-            testedObject.getCurrentNumberOfSubscriptions()));
+        await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions()));
 
         assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID)));
 
-        verify(appConfig).getDmaapConsumerConfiguration();
-        verify(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
-        verifyNoMoreInteractions(appConfig);
-
-        assertEquals(1, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 1");
-    }
-
-    @Test
-    public void executeDatafileMainTask_unconfiguredChangeIdentifier() throws DatafileTaskException {
-        final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
-            .publishUrl(publishUrl) //
-            .logUrl("") //
-            .userName("userName") //
-            .password("passWord") //
-            .trustStorePath("trustStorePath") //
-            .trustStorePasswordPath("trustStorePasswordPath") //
-            .keyStorePath("keyStorePath") //
-            .keyStorePasswordPath("keyStorePasswordPath") //
-            .enableDmaapCertAuth(true) //
-            .changeIdentifier("Different changeIdentifier") //
-            .build(); //
-        final ConsumerConfiguration dmaapConsumerConfiguration =
-            new ConsumerConfiguration(mock(MessageRouterSubscriberConfig.class), mock(MessageRouterSubscriber.class),
-                mock(MessageRouterSubscribeRequest.class));
-
-        doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
-        doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
-        doReturn(false).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
-        final int noOfEvents = 1;
-        final int noOfFilesPerEvent = 1;
-
-        Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
-        doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
-
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
-        testedObject.executeDatafileMainTask();
-
-        await().untilAsserted(() -> assertEquals(0, testedObject.getCurrentNumberOfSubscriptions(),"currentNumberOfSubscriptions should have been 0"));
-
-        assertTrue(logAppender.list.toString().contains(
-            "[INFO] No feed is configured for: " + CHANGE_IDENTIFIER + ", file ignored: " + PM_FILE_NAME + "1"),"Error missing in log");
+        assertEquals(1, testedObject.getCounters().getTotalReceivedEvents(), "totalReceivedEvents should have been 1");
     }
 
     @Test
@@ -308,8 +255,10 @@
             .expectComplete() //
             .verify(); //
 
-        assertTrue(logAppender.list.toString()
-            .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"),"Error missing in log");
+        assertTrue(
+            logAppender.list.toString()
+                .contains("[ERROR] Polling for file ready message failed, " + "exception: java.lang.Exception: Failed"),
+            "Error missing in log");
     }
 
     @Test
@@ -323,11 +272,8 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
 
-        doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
-
         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
 
         StepVerifier //
             .create(testedObject.createMainTask(contextMap)) //
@@ -345,11 +291,8 @@
         verify(fileCollectorMock, times(noOfFiles)).collectFile(notNull(), anyLong(), notNull(), notNull());
         verifyNoMoreInteractions(fileCollectorMock);
 
-        verify(dataRouterMock, times(noOfFiles)).publishFile(notNull(), anyLong(), notNull());
-        verifyNoMoreInteractions(dataRouterMock);
-
-        assertEquals(200,
-            testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 200");
+        assertEquals(200, testedObject.getCounters().getTotalReceivedEvents(),
+            "totalReceivedEvents should have been 200");
     }
 
     @Test
@@ -359,8 +302,6 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
 
-        doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
-
         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
         Mono<Object> error = Mono.error(new Exception("problem"));
 
@@ -369,9 +310,6 @@
             .when(fileCollectorMock) //
             .collectFile(any(FileData.class), anyLong(), any(Duration.class), notNull());
 
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
-
         StepVerifier //
             .create(testedObject.createMainTask(contextMap)) //
             .expectSubscription() //
@@ -387,55 +325,9 @@
         verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
         verifyNoMoreInteractions(fileCollectorMock);
 
-        verify(dataRouterMock, times(3)).publishFile(notNull(), anyLong(), notNull());
-        verifyNoMoreInteractions(dataRouterMock);
-
-        assertEquals(2, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 2");
-        assertEquals(1, testedObject.getCounters().getNoOfFailedFtp(),"failedFtp should have been 1");
-        assertEquals(0, testedObject.getCounters().getNoOfFailedHttp(),"failedHttp should have been 0");
-    }
-
-    @Test
-    public void consume_publishFailedOnce() throws DatafileTaskException {
-        setUpConfiguration();
-
-        Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
-        doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
-
-        doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
-
-        Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
-        doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
-
-        Mono<Object> error = Mono.error(new Exception("problem"));
-        // One publish will fail, the rest will succeed
-        doReturn(collectedFile, error, collectedFile, collectedFile) //
-            .when(dataRouterMock) //
-            .publishFile(notNull(), anyLong(), notNull());
-
-        final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ScheduledTasks.class);
-        StepVerifier //
-            .create(testedObject.createMainTask(contextMap)) //
-            .expectSubscription() //
-            .expectNextCount(3) // 3 completed files
-            .expectComplete() //
-            .verify(); //
-
-        assertTrue(logAppender.list.toString().contains("[ERROR] File publishing failed: "));
-
-        assertEquals(0, testedObject.getCurrentNumberOfTasks());
-
-        verify(consumerMock, times(1)).getMessageRouterResponse();
-        verifyNoMoreInteractions(consumerMock);
-
-        verify(fileCollectorMock, times(4)).collectFile(notNull(), anyLong(), notNull(), notNull());
-        verifyNoMoreInteractions(fileCollectorMock);
-
-        verify(dataRouterMock, times(4)).publishFile(notNull(), anyLong(), notNull());
-        verifyNoMoreInteractions(dataRouterMock);
-
-        assertEquals(2, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 2");
-        assertEquals(1, testedObject.getCounters().getNoOfFailedPublish(),"noOfFailedPublish should have been 1");
+        assertEquals(2, testedObject.getCounters().getTotalReceivedEvents(), "totalReceivedEvents should have been 2");
+        assertEquals(1, testedObject.getCounters().getNoOfFailedFtp(), "failedFtp should have been 1");
+        assertEquals(0, testedObject.getCounters().getNoOfFailedHttp(), "failedHttp should have been 0");
     }
 
     @Test
@@ -449,15 +341,12 @@
         Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
         doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
 
-        doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
-
         Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
         doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
-        doReturn(collectedFile).when(dataRouterMock).publishFile(notNull(), anyLong(), notNull());
 
         StepVerifier //
             .create(testedObject.createMainTask(contextMap)).expectSubscription() //
-            .expectNextCount(1) // 99 is skipped
+            .expectNextCount(100) // 99 is skipped (NOT ANYMORE TODO)
             .expectComplete() //
             .verify(); //
 
@@ -466,15 +355,9 @@
         verify(consumerMock, times(1)).getMessageRouterResponse();
         verifyNoMoreInteractions(consumerMock);
 
-        verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
+        verify(fileCollectorMock, times(100)).collectFile(notNull(), anyLong(), notNull(), notNull());
         verifyNoMoreInteractions(fileCollectorMock);
 
-        verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
-        verifyNoMoreInteractions(dataRouterMock);
-
-        verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
-        verifyNoMoreInteractions(publishedCheckerMock);
-
-        assertEquals(1, testedObject.getCounters().getTotalReceivedEvents(),"totalReceivedEvents should have been 1");
+        assertEquals(1, testedObject.getCounters().getTotalReceivedEvents(), "totalReceivedEvents should have been 1");
     }
 }
diff --git a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/SecurityUtilTest.java b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/SecurityUtilTest.java
index 75ad8a4..aba2e9d 100644
--- a/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/SecurityUtilTest.java
+++ b/datafile/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/SecurityUtilTest.java
@@ -15,13 +15,10 @@
  */
 package org.onap.dcaegen2.collectors.datafile.utils;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.onap.dcaegen2.collectors.datafile.commons.SecurityUtil;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -29,10 +26,13 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.when;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.onap.dcaegen2.collectors.datafile.commons.SecurityUtil;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 /**
  * Tests the SecurityUtil.