Merge "Fix security vulnerabilities"
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..82f58ed
--- /dev/null
+++ b/README.md
@@ -0,0 +1,41 @@
+# DFC (DataFile Collector)
+
+Physical Network Function Registration Handler is responsible for registration of PNF (Physical Network Function) to
+ONAP (Open Network Automation Platform) in plug and play manner.
+
+## Introduction
+
+DFC is delivered as one **Docker container** which hosts application server and can be started by `docker-compose`.
+
+## Compiling DFC
+
+Whole project (top level of DFC directory) and each module (sub module directory) can be compiled using
+`mvn clean install` command.
+
+## Main API Endpoints
+
+Running with dev-mode of DFC
+
+- **Heartbeat**: http://<container_address>:8100/**heartbeat** or https://<container_address>:8443/**heartbeat**
+
+- **Start DFC**: http://<container_address>:8100/**start** or https://<container_address>:8433/**start**
+
+- **Stop DFC**: http://<container_address>:8100/**stopDatafile** or https://<container_address>:8433/**stopDatafile**
+
+## Maven GroupId:
+
+org.onap.dcaegen2.collectors
+
+### Maven Parent ArtifactId:
+
+dcae-services
+
+### Maven Children Artifacts:
+1. datafile-app-server: Datafile Collector (DFC) server
+2. datafile-commons: Common code for whole dfc modules
+3. datafile-dmaap-client: http client used to connect to dmaap message router/data router
+
+## License
+
+Copyright (C) 2018-2019 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+[License](http://www.apache.org/licenses/LICENSE-2.0)
diff --git a/datafile-app-server/config/datafile_endpoints.json b/datafile-app-server/config/datafile_endpoints.json
index 7918954..e1a9d38 100644
--- a/datafile-app-server/config/datafile_endpoints.json
+++ b/datafile-app-server/config/datafile_endpoints.json
@@ -26,9 +26,9 @@
         },
         "ftp": {
             "ftpesConfiguration": {
-                "keyCert": "config/ftpKey.jks",
+                "keyCert": "config/dfc.jks",
                 "keyPassword": "secret",
-                "trustedCA": "config/cacerts",
+                "trustedCA": "config/ftp.jks",
                 "trustedCAPassword": "secret"
             }
         },
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 3a53c13..4e8f5c5 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -139,10 +139,6 @@
       <artifactId>cbs-client</artifactId>
     </dependency>
     <dependency>
-      <groupId>io.projectreactor</groupId>
-      <artifactId>reactor-core</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index e1e5af2..5bbacb1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
index 3af5545..59bb259 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfig.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,13 @@
 
 package org.onap.dcaegen2.collectors.datafile.configuration;
 
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.TypeAdapterFactory;
+
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -26,7 +33,6 @@
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
 
-
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
@@ -35,13 +41,6 @@
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
@@ -58,7 +57,6 @@
     private static final String FTP = "ftp";
     private static final String FTPES_CONFIGURATION = "ftpesConfiguration";
     private static final String SECURITY = "security";
-
     private static final Logger logger = LoggerFactory.getLogger(DatafileAppConfig.class);
 
     DmaapConsumerConfiguration dmaapConsumerConfiguration;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
index 6420b4a..478ae30 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfig.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -21,9 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
-
 import javax.annotation.PostConstruct;
-
 import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Configuration;
@@ -31,7 +29,6 @@
 import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
-
 import io.swagger.annotations.ApiOperation;
 import reactor.core.publisher.Mono;
 
@@ -44,7 +41,7 @@
 
     private static final int SCHEDULING_DELAY_FOR_DATAFILE_COLLECTOR_TASKS = 15;
     private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
-    private static volatile List<ScheduledFuture> scheduledFutureList = new ArrayList<>();
+    private static volatile List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
 
     private final TaskScheduler taskScheduler;
     private final ScheduledTasks scheduledTask;
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
index 5765b31..825308e 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/controllers/ScheduleController.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
deleted file mode 100644
index a1758ea..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapEmptyResponseException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.exceptions;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
- */
-public class DmaapEmptyResponseException extends DatafileTaskException {
-
-    private static final long serialVersionUID = 1L;
-
-    public DmaapEmptyResponseException() {
-        super();
-    }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
index 401889f..3627901 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DmaapNotFoundException.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
index 5377b9c..bdb47b2 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileData.java
@@ -1,25 +1,31 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.model;
 
+import java.net.URI;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
 
 /**
  * Contains data, from the fileReady event, about the file to collect from the xNF.
@@ -28,16 +34,56 @@
  */
 @Value.Immutable
 @Gson.TypeAdapters
-public interface FileData {
-    FileMetaData fileMetaData();
+public abstract class FileData {
+    private static final String DATAFILE_TMPDIR = "/tmp/onap_datafile/";
 
-    String name();
+    public abstract String name();
 
-    String location();
+    public abstract String location();
 
-    String compression();
+    public abstract Scheme scheme();
 
-    String fileFormatType();
+    public abstract String compression();
 
-    String fileFormatVersion();
-}
+    public abstract String fileFormatType();
+
+    public abstract String fileFormatVersion();
+
+    public String remoteFilePath() {
+        return URI.create(location()).getPath();
+    }
+
+    public Path getLocalFileName() {
+        URI uri = URI.create(location());
+        return createLocalFileName(uri.getHost(), name());
+    }
+
+    public static Path createLocalFileName(String host, String fileName) {
+        return Paths.get(DATAFILE_TMPDIR, host + "_" + fileName);
+    }
+
+    public FileServerData fileServerData() {
+        URI uri = URI.create(location());
+        Optional<String[]> userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
+        // @formatter:off
+        ImmutableFileServerData.Builder builder = ImmutableFileServerData.builder()
+                .serverAddress(uri.getHost())
+                .userId(userInfo.isPresent() ? userInfo.get()[0] : "")
+                .password(userInfo.isPresent() ? userInfo.get()[1] : "");
+        if (uri.getPort() > 0) {
+            builder.port(uri.getPort());
+        }
+        return builder.build();
+        // @formatter:on
+    }
+
+    private Optional<String[]> getUserNameAndPasswordIfGiven(String userInfoString) {
+        if (userInfoString != null) {
+            String[] userAndPassword = userInfoString.split(":");
+            if (userAndPassword.length == 2) {
+                return Optional.of(userAndPassword);
+            }
+        }
+        return Optional.empty();
+    }
+}
\ No newline at end of file
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
similarity index 63%
copy from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
copy to datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
index 7a04710..e3293fa 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileReadyMessage.java
@@ -1,7 +1,7 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -13,23 +13,27 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * ============LICENSE_END========================================================================
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
  */
 
-package org.onap.dcaegen2.collectors.datafile.exceptions;
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import java.util.List;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
 
 /**
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
-public class DatafileTaskException extends Exception {
+@Value.Immutable
+@Gson.TypeAdapters
+public interface FileReadyMessage {
+    public String pnfName();
 
-    private static final long serialVersionUID = 1L;
+    public MessageMetaData messageMetaData();
 
-    public DatafileTaskException() {
-        super();
-    }
-
-    public DatafileTaskException(String message) {
-        super(message);
-    }
+    public List<FileData> files();
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
similarity index 72%
rename from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
rename to datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 46c6e94..3c606de 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START========================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ==================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,19 @@
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.StreamSupport;
 
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.StringUtils;
@@ -38,13 +44,12 @@
 import reactor.core.publisher.Mono;
 
 /**
- * Parses the fileReady event and creates an array of FileData containing the information.
+ * Parses the fileReady event and creates a Flux of FileReadyMessage containing the information.
  *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
-public class DmaapConsumerJsonParser {
-    private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
+public class JsonMessageParser {
+    private static final Logger logger = LoggerFactory.getLogger(JsonMessageParser.class);
 
     private static final String COMMON_EVENT_HEADER = "commonEventHeader";
     private static final String EVENT_NAME = "eventName";
@@ -83,31 +88,8 @@
         }
     }
 
-    /**
-     * Extract info from string and create a {@link FileData}.
-     *
-     * @param rawMessage - results from DMaaP
-     * @return reactive Mono with an array of FileData
-     */
-    public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
-        return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
-    }
-
-    private Mono<JsonElement> getJsonParserMessage(String message) {
-        logger.trace("original message from message router: {}", message);
-        return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
-    }
-
-    private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
-        return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
-                : getFileDataFromJsonArray(jsonElement);
-    }
-
-    private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
-        return create(
-                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
-                        .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
-                                .orElseGet(JsonObject::new)))));
+    public Flux<FileReadyMessage> getMessagesFromJson(Mono<String> rawMessage) {
+        return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createMessageData);
     }
 
     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -117,20 +99,54 @@
                         : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
     }
 
-    private Flux<FileData> create(Flux<JsonObject> jsonObject) {
-        return jsonObject
-                .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
-                        ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
-                        : transform(monoJsonP));
+    private Flux<FileReadyMessage> getMessagesFromJsonArray(JsonElement jsonElement) {
+        return createMessages(
+                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+                        .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+                                .orElseGet(JsonObject::new)))));
     }
 
-    private Flux<FileData> transform(JsonObject message) {
-        Optional<FileMetaData> fileMetaData = getFileMetaData(message);
-        if (fileMetaData.isPresent()) {
+    /**
+     * Extract info from string and create a Flux of {@link FileReadyMessage}.
+     *
+     * @param rawMessage - results from DMaaP
+     * @return reactive Flux of FileReadyMessages
+     */
+    private Flux<FileReadyMessage> createMessageData(JsonElement jsonElement) {
+        return jsonElement.isJsonObject() ? createMessages(Flux.just(jsonElement.getAsJsonObject()))
+                : getMessagesFromJsonArray(jsonElement);
+    }
+
+    private Mono<JsonElement> getJsonParserMessage(String message) {
+        logger.trace("original message from message router: {}", message);
+        return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
+    }
+
+    private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
+        return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+                ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
+                : transformMessages(monoJsonP));
+    }
+
+    private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+        Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
+        if (optionalMessageMetaData.isPresent()) {
             JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
             JsonArray arrayOfNamedHashMap = notificationFields.getAsJsonArray(ARRAY_OF_NAMED_HASH_MAP);
             if (arrayOfNamedHashMap != null) {
-                return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
+                List<FileData> allFileDataFromJson = getAllFileDataFromJson(arrayOfNamedHashMap);
+                if (!allFileDataFromJson.isEmpty()) {
+                    MessageMetaData messageMetaData = optionalMessageMetaData.get();
+                    // @formatter:off
+                    return Flux.just(ImmutableFileReadyMessage.builder()
+                            .pnfName(messageMetaData.sourceName())
+                            .messageMetaData(messageMetaData)
+                            .files(allFileDataFromJson)
+                            .build());
+                    // @formatter:on
+                } else {
+                    return Flux.empty();
+                }
             }
 
             logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
@@ -140,7 +156,7 @@
         return Flux.empty();
     }
 
-    private Optional<FileMetaData> getFileMetaData(JsonObject message) {
+    private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
         List<String> missingValues = new ArrayList<>();
         JsonObject commonEventHeader = message.getAsJsonObject(EVENT).getAsJsonObject(COMMON_EVENT_HEADER);
         String eventName = getValueFromJson(commonEventHeader, EVENT_NAME, missingValues);
@@ -154,7 +170,7 @@
         getValueFromJson(notificationFields, NOTIFICATION_FIELDS_VERSION, missingValues);
 
         // @formatter:off
-        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
                 .productName(getDataFromEventName(EventNameDataType.PRODUCT_NAME, eventName, missingValues))
                 .vendorName(getDataFromEventName(EventNameDataType.VENDOR_NAME, eventName, missingValues))
                 .lastEpochMicrosec(getValueFromJson(commonEventHeader, LAST_EPOCH_MICROSEC, missingValues))
@@ -166,7 +182,7 @@
                 .build();
         // @formatter:on
         if (missingValues.isEmpty() && isChangeIdentifierCorrect(changeIdentifier) && isChangeTypeCorrect(changeType)) {
-            return Optional.of(fileMetaData);
+            return Optional.of(messageMetaData);
         } else {
             String errorMessage = "Unable to collect file from xNF.";
             if (!missingValues.isEmpty()) {
@@ -189,32 +205,40 @@
         return FILE_READY_CHANGE_IDENTIFIER.equals(changeIdentifier);
     }
 
-    private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
+    private List<FileData> getAllFileDataFromJson(JsonArray arrayOfAdditionalFields) {
         List<FileData> res = new ArrayList<>();
         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
             JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
-            Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+            Optional<FileData> fileData = getFileDataFromJson(fileInfo);
 
             if (fileData.isPresent()) {
                 res.add(fileData.get());
             }
         }
-        return Flux.fromIterable(res);
+        return res;
     }
 
-    private Optional<FileData> getFileDataFromJson(FileMetaData fileMetaData, JsonObject fileInfo) {
+    private Optional<FileData> getFileDataFromJson(JsonObject fileInfo) {
         logger.trace("starting to getFileDataFromJson!");
 
         List<String> missingValues = new ArrayList<>();
         JsonObject data = fileInfo.getAsJsonObject(HASH_MAP);
 
+        String location = getValueFromJson(data, LOCATION, missingValues);
+        Scheme scheme;
+        try {
+            scheme = Scheme.getSchemeFromString(URI.create(location).getScheme());
+        } catch (Exception e) {
+            logger.error("Unable to collect file from xNF.", e);
+            return Optional.empty();
+        }
         // @formatter:off
         FileData fileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(getValueFromJson(fileInfo, NAME, missingValues))
                 .fileFormatType(getValueFromJson(data, FILE_FORMAT_TYPE, missingValues))
                 .fileFormatVersion(getValueFromJson(data, FILE_FORMAT_VERSION, missingValues))
-                .location(getValueFromJson(data, LOCATION, missingValues))
+                .location(location)
+                .scheme(scheme)
                 .compression(getValueFromJson(data, COMPRESSION, missingValues))
                 .build();
         // @formatter:on
@@ -260,7 +284,7 @@
         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
     }
 
-    private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+    private Flux<FileReadyMessage> logErrorAndReturnEmptyMessageFlux(String errorMessage) {
         logger.error(errorMessage);
         return Flux.empty();
     }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
new file mode 100644
index 0000000..c41dce5
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTask.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.reactive.function.client.WebClient;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class DMaaPMessageConsumerTask {
+    private static final Logger logger = LoggerFactory.getLogger(DMaaPMessageConsumerTask.class);
+
+    private Config datafileAppConfig;
+    private JsonMessageParser jsonMessageParser;
+    private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
+
+    public DMaaPMessageConsumerTask(AppConfig datafileAppConfig) {
+        this.datafileAppConfig = datafileAppConfig;
+        this.jsonMessageParser = new JsonMessageParser();
+    }
+
+    protected DMaaPMessageConsumerTask(AppConfig datafileAppConfig,
+                                    DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
+            JsonMessageParser messageParser) {
+        this.datafileAppConfig = datafileAppConfig;
+        this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
+        this.jsonMessageParser = messageParser;
+    }
+
+    public Flux<FileReadyMessage> execute() {
+        dmaaPConsumerReactiveHttpClient = resolveClient();
+        logger.trace("execute called");
+        return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
+    }
+
+    private Flux<FileReadyMessage> consume(Mono<String> message) {
+        logger.trace("consume called with arg {}", message);
+        return jsonMessageParser.getMessagesFromJson(message);
+    }
+
+    protected DmaapConsumerConfiguration resolveConfiguration() {
+        return datafileAppConfig.getDmaapConsumerConfiguration();
+    }
+
+    protected DMaaPConsumerReactiveHttpClient resolveClient() {
+        return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
+    }
+
+    protected WebClient buildWebClient() {
+        return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+    }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
new file mode 100644
index 0000000..b65ddd6
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -0,0 +1,86 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.time.Duration;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+
+import reactor.core.publisher.Flux;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class DataRouterPublisher {
+
+    private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
+    private final Config datafileAppConfig;
+
+    public DataRouterPublisher(AppConfig datafileAppConfig) {
+        this.datafileAppConfig = datafileAppConfig;
+    }
+
+
+    /**
+     * Publish one file
+     * @param consumerDmaapModel information about the file to publish
+     * @param maxNumberOfRetries the maximal number of retries if the publishing fails
+     * @param firstBackoffTimeout the time to delay the first retry
+     * @return the HTTP response status as a string
+     */
+    public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+        logger.trace("Method called with arg {}", model);
+        DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+
+        //@formatter:off
+        return Flux.just(model)
+                .cache(1)
+                .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
+                .flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
+                .retryBackoff(numRetries, firstBackoff);
+        //@formatter:on
+    }
+
+    private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+
+        if (HttpUtils.isSuccessfulResponseCode(response.value())) {
+            logger.trace("Publish to DR successful!");
+            return Flux.just(model);
+        } else {
+            logger.warn("Publish to DR unsuccessful, response code: " + response);
+            return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+        }
+    }
+
+
+    DmaapPublisherConfiguration resolveConfiguration() {
+        return datafileAppConfig.getDmaapPublisherConfiguration();
+    }
+
+    DmaapProducerReactiveHttpClient resolveClient() {
+        return new DmaapProducerReactiveHttpClient(resolveConfiguration());
+    }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
deleted file mode 100644
index 4fbc17f..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTask.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapReactiveWebClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapConsumerTask {
-
-    abstract Flux<FileData> consume(Mono<String> message) throws DmaapNotFoundException;
-
-    abstract DMaaPConsumerReactiveHttpClient resolveClient();
-
-    abstract void initConfigs();
-
-    protected abstract DmaapConsumerConfiguration resolveConfiguration();
-
-    protected abstract Flux<FileData> execute(String object);
-
-    WebClient buildWebClient() {
-        return new DmaapReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
-    }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
deleted file mode 100644
index 5bd0bf3..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
-
-    private static final Logger logger = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
-
-    private Config datafileAppConfig;
-    private DmaapConsumerJsonParser dmaapConsumerJsonParser;
-    private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
-
-    @Autowired
-    public DmaapConsumerTaskImpl(AppConfig datafileAppConfig) {
-        this.datafileAppConfig = datafileAppConfig;
-        this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
-    }
-
-    protected DmaapConsumerTaskImpl(AppConfig datafileAppConfig,
-                                    DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient,
-                                    DmaapConsumerJsonParser dmaapConsumerJsonParser) {
-        this.datafileAppConfig = datafileAppConfig;
-        this.dmaaPConsumerReactiveHttpClient = dmaaPConsumerReactiveHttpClient;
-        this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
-    }
-
-    @Override
-    Flux<FileData> consume(Mono<String> message) {
-        logger.trace("consume called with arg {}", message);
-        return dmaapConsumerJsonParser.getJsonObject(message);
-    }
-
-    @Override
-    protected Flux<FileData> execute(String object) {
-        dmaaPConsumerReactiveHttpClient = resolveClient();
-        logger.trace("execute called with arg {}", object);
-        return consume((dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
-    }
-
-    @Override
-    void initConfigs() {
-        datafileAppConfig.initFileStreamReader();
-    }
-
-    @Override
-    protected DmaapConsumerConfiguration resolveConfiguration() {
-        return datafileAppConfig.getDmaapConsumerConfiguration();
-    }
-
-    @Override
-    protected DMaaPConsumerReactiveHttpClient resolveClient() {
-        return new DMaaPConsumerReactiveHttpClient(resolveConfiguration(), buildWebClient());
-    }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
deleted file mode 100644
index cb194cf..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTask.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-abstract class DmaapPublisherTask {
-
-    protected abstract DmaapPublisherConfiguration resolveConfiguration();
-
-    protected abstract DmaapProducerReactiveHttpClient resolveClient();
-
-    protected abstract Flux<String> execute(ConsumerDmaapModel consumerDmaapModel);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
deleted file mode 100644
index 56a2fc2..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
-
-    private static final Logger logger = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
-    private final Config datafileAppConfig;
-
-    @Autowired
-    public DmaapPublisherTaskImpl(AppConfig datafileAppConfig) {
-        this.datafileAppConfig = datafileAppConfig;
-    }
-
-    @Override
-    public Flux<String> execute(ConsumerDmaapModel consumerDmaapModel) {
-        logger.trace("Method called with arg {}", consumerDmaapModel);
-        DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
-        return dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel);
-    }
-
-    @Override
-    protected DmaapPublisherConfiguration resolveConfiguration() {
-        return datafileAppConfig.getDmaapPublisherConfiguration();
-    }
-
-    @Override
-    protected DmaapProducerReactiveHttpClient resolveClient() {
-        return new DmaapProducerReactiveHttpClient(resolveConfiguration());
-    }
-
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
new file mode 100644
index 0000000..db18ac2
--- /dev/null
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollector.java
@@ -0,0 +1,130 @@
+/*
+ * ============LICENSE_START======================================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ===============================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END========================================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.tasks;
+
+import java.nio.file.Path;
+import java.time.Duration;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.Config;
+import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
+import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+public class FileCollector {
+
+    private static final Logger logger = LoggerFactory.getLogger(FileCollector.class);
+    private Config datafileAppConfig;
+    private final FtpsClient ftpsClient;
+    private final SftpClient sftpClient;
+
+
+    public FileCollector(AppConfig datafileAppConfig, FtpsClient ftpsClient, SftpClient sftpClient) {
+        this.datafileAppConfig = datafileAppConfig;
+        this.ftpsClient = ftpsClient;
+        this.sftpClient = sftpClient;
+    }
+
+    public Mono<ConsumerDmaapModel> execute(FileData fileData, MessageMetaData metaData, long maxNumberOfRetries,
+            Duration firstBackoffTimeout) {
+        logger.trace("Entering execute with {}", fileData);
+        resolveKeyStore();
+
+        //@formatter:off
+        return Mono.just(fileData)
+            .cache()
+            .flatMap(fd -> collectFile(fileData, metaData))
+            .retryBackoff(maxNumberOfRetries, firstBackoffTimeout);
+        //@formatter:on
+    }
+
+    private FtpesConfig resolveConfiguration() {
+        return datafileAppConfig.getFtpesConfiguration();
+    }
+
+    private void resolveKeyStore() {
+        FtpesConfig ftpesConfig = resolveConfiguration();
+        ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
+        ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
+        ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
+        ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
+    }
+
+    private Mono<ConsumerDmaapModel> collectFile(FileData fileData, MessageMetaData metaData) {
+        logger.trace("starting to collectFile");
+
+        final String remoteFile = fileData.remoteFilePath();
+        final Path localFile = fileData.getLocalFileName();
+
+        try {
+            localFile.getParent().toFile().mkdir(); // Create parent directories
+
+            FileCollectClient currentClient = selectClient(fileData);
+
+            currentClient.collectFile(remoteFile, localFile);
+            return Mono.just(getConsumerDmaapModel(fileData, metaData, localFile));
+        } catch (Exception throwable) {
+            logger.warn("Failed to download file: {}, reason: {}", fileData.name(), throwable);
+            return Mono.error(throwable);
+        }
+    }
+
+    private FileCollectClient selectClient(FileData fileData) throws DatafileTaskException {
+        switch (fileData.scheme()) {
+            case SFTP:
+                return sftpClient;
+            case FTPS:
+                return ftpsClient;
+            default:
+                throw new DatafileTaskException("Unhandeled protocol: " + fileData.scheme());
+        }
+    }
+
+    private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, MessageMetaData metaData, Path localFile) {
+        String location = fileData.location();
+
+        // @formatter:off
+        return ImmutableConsumerDmaapModel.builder()
+                .productName(metaData.productName())
+                .vendorName(metaData.vendorName())
+                .lastEpochMicrosec(metaData.lastEpochMicrosec())
+                .sourceName(metaData.sourceName())
+                .startEpochMicrosec(metaData.startEpochMicrosec())
+                .timeZoneOffset(metaData.timeZoneOffset())
+                .name(fileData.name())
+                .location(location)
+                .internalLocation(localFile.toString())
+                .compression(fileData.compression())
+                .fileFormatType(fileData.fileFormatType())
+                .fileFormatVersion(fileData.fileFormatVersion())
+                .build();
+        // @formatter:on
+    }
+}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
deleted file mode 100644
index 7e08f12..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/RetryTimer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-public class RetryTimer {
-    public void waitRetryTime() {
-        try {
-            Thread.sleep(60000);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index c465fe9..f22c7bf 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -16,15 +16,31 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
+import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 /**
@@ -34,25 +50,37 @@
 @Component
 public class ScheduledTasks {
 
-    private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+    private static final int MAX_NUMBER_OF_CONCURRENT_TASKS = 200;
 
-    private final DmaapConsumerTask dmaapConsumerTask;
-    private final XnfCollectorTask xnfCollectorTask;
-    private final DmaapPublisherTask dmaapProducerTask;
+    /** Data needed for fetching of files from one PNF */
+    private class FileCollectionData {
+        final FileData fileData;
+        final FileCollector collectorTask; // Same object, ftp session etc. can be used for each file in one VES
+                                                  // event
+        final MessageMetaData metaData;
+
+        FileCollectionData(FileData fd, FileCollector collectorTask, MessageMetaData metaData) {
+            this.fileData = fd;
+            this.collectorTask = collectorTask;
+            this.metaData = metaData;
+        }
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+    private final AppConfig applicationConfiguration;
+    private final AtomicInteger taskCounter = new AtomicInteger();
+    private final Set<Path> alreadyPublishedFiles = Collections.synchronizedSet(new HashSet<Path>());
 
     /**
      * Constructor for task registration in Datafile Workflow.
      *
-     * @param dmaapConsumerTask - fist task
+     * @param applicationConfiguration - application configuration
      * @param xnfCollectorTask - second task
      * @param dmaapPublisherTask - third task
      */
     @Autowired
-    public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, XnfCollectorTask xnfCollectorTask,
-            DmaapPublisherTask dmaapPublisherTask) {
-        this.dmaapConsumerTask = dmaapConsumerTask;
-        this.xnfCollectorTask = xnfCollectorTask;
-        this.dmaapProducerTask = dmaapPublisherTask;
+    public ScheduledTasks(AppConfig applicationConfiguration) {
+        this.applicationConfiguration = applicationConfiguration;
     }
 
     /**
@@ -60,17 +88,20 @@
      */
     public void scheduleMainDatafileEventTask() {
         logger.trace("Execution of tasks was registered");
+        applicationConfiguration.initFileStreamReader();
         //@formatter:off
-        consumeFromDmaapMessage()
-                .publishOn(Schedulers.parallel())
-                .cache()
-                .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
-                .flatMap(this::collectFilesFromXnf)
-                .retry(3)
-                .cache()
-                .flatMap(this::publishToDmaapConfiguration)
-                .retry(3)
-                .subscribe(this::onSuccess, this::onError, this::onComplete);
+        consumeMessagesFromDmaap()
+            .parallel() // Each FileReadyMessage in a separate thread
+            .runOn(Schedulers.parallel())
+            .flatMap(this::createFileCollectionTask)
+            .filter(this::shouldBePublished)
+            .doOnNext(fileData -> taskCounter.incrementAndGet())
+            .flatMap(this::collectFileFromXnf)
+            .flatMap(this::publishToDataRouter)
+            .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+            .doOnNext(model -> taskCounter.decrementAndGet())
+            .sequential()
+            .subscribe(this::onSuccess, this::onError, this::onComplete);
         //@formatter:on
     }
 
@@ -78,26 +109,91 @@
         logger.info("Datafile tasks have been completed");
     }
 
-    private void onSuccess(String responseCode) {
-        logger.info("Datafile consumed tasks. HTTP Response code {}", responseCode);
+    private void onSuccess(Path localFile) {
+        logger.info("Datafile consumed tasks." + localFile);
     }
 
     private void onError(Throwable throwable) {
-        if (!(throwable instanceof DmaapEmptyResponseException)) {
-            logger.error("Chain of tasks have been aborted due to errors in Datafile workflow", throwable);
+        logger.error("Chain of tasks have been aborted due to errors in Datafile workflow {}", throwable);
+    }
+
+    private Flux<FileCollectionData> createFileCollectionTask(FileReadyMessage availableFiles) {
+        List<FileCollectionData> fileCollects = new ArrayList<>();
+
+        for (FileData fileData : availableFiles.files()) {
+            FileCollector task = new FileCollector(applicationConfiguration,
+                    new FtpsClient(fileData.fileServerData()), new SftpClient(fileData.fileServerData()));
+            fileCollects.add(new FileCollectionData(fileData, task, availableFiles.messageMetaData()));
         }
+        return Flux.fromIterable(fileCollects);
     }
 
-    private Flux<FileData> consumeFromDmaapMessage() {
-        dmaapConsumerTask.initConfigs();
-        return dmaapConsumerTask.execute("");
+    private boolean shouldBePublished(FileCollectionData task) {
+        return alreadyPublishedFiles.add(task.fileData.getLocalFileName());
     }
 
-    private Flux<ConsumerDmaapModel> collectFilesFromXnf(FileData fileData) {
-        return xnfCollectorTask.execute(fileData);
+    private Mono<ConsumerDmaapModel> collectFileFromXnf(FileCollectionData fileCollect) {
+        final long maxNUmberOfRetries = 3;
+        final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+        return fileCollect.collectorTask
+                .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
+                .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
     }
 
-    private Flux<String> publishToDmaapConfiguration(ConsumerDmaapModel monoModel) {
-        return dmaapProducerTask.execute(monoModel);
+    private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
+        logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
+        deleteFile(fileData.getLocalFileName());
+        alreadyPublishedFiles.remove(fileData.getLocalFileName());
+        taskCounter.decrementAndGet();
+        return Mono.empty();
+    }
+
+    private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+        final long maxNumberOfRetries = 3;
+        final Duration initialRetryTimeout = Duration.ofSeconds(5);
+
+        DataRouterPublisher publisherTask = new DataRouterPublisher(applicationConfiguration);
+
+        return publisherTask.execute(model, maxNumberOfRetries, initialRetryTimeout)
+                .onErrorResume(exception -> handlePublishFailure(model, exception));
+
+    }
+
+    private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+        logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
+        Path internalFileName = Paths.get(model.getInternalLocation());
+        deleteFile(internalFileName);
+        alreadyPublishedFiles.remove(internalFileName);
+        taskCounter.decrementAndGet();
+        return Flux.empty();
+    }
+
+    private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
+        final int currentNumberOfTasks = taskCounter.get();
+        logger.trace("Consuming new file ready messages, current number of tasks: {}", currentNumberOfTasks);
+        if (currentNumberOfTasks > MAX_NUMBER_OF_CONCURRENT_TASKS) {
+            return Flux.empty();
+        }
+
+        final DMaaPMessageConsumerTask messageConsumerTask =
+                new DMaaPMessageConsumerTask(this.applicationConfiguration);
+        return messageConsumerTask.execute()
+                .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+    }
+
+    private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
+        logger.error("Polling for file ready message filed, exception: {}", exception);
+        return Flux.empty();
+    }
+
+    private Flux<Path> deleteFile(Path localFile) {
+        logger.trace("Deleting file: {}", localFile);
+        try {
+            Files.delete(localFile);
+        } catch (Exception e) {
+            logger.warn("Could not delete file: {}, {}", localFile, e);
+        }
+        return Flux.just(localFile);
     }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
deleted file mode 100644
index b98d40d..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTask.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-public interface XnfCollectorTask {
-    abstract FtpesConfig resolveConfiguration();
-    Flux<ConsumerDmaapModel> execute(FileData fileData);
-}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
deleted file mode 100644
index c03d903..0000000
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.tasks;
-
-import java.io.File;
-import java.net.URI;
-
-import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.configuration.Config;
-import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
-import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
-import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import reactor.core.publisher.Flux;
-
-/**
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-@Component
-public class XnfCollectorTaskImpl implements XnfCollectorTask {
-
-    private static final String FTPES = "ftpes";
-    private static final String FTPS = "ftps";
-    private static final String SFTP = "sftp";
-    private static final Logger logger = LoggerFactory.getLogger(XnfCollectorTaskImpl.class);
-    private Config datafileAppConfig;
-    private final FtpsClient ftpsClient;
-    private final SftpClient sftpClient;
-    private RetryTimer retryTimer;
-
-    @Autowired
-    protected XnfCollectorTaskImpl(AppConfig datafileAppConfig, FtpsClient ftpsCleint, SftpClient sftpClient) {
-        this.datafileAppConfig = datafileAppConfig;
-        this.ftpsClient = ftpsCleint;
-        this.sftpClient = sftpClient;
-    }
-
-    @Override
-    public Flux<ConsumerDmaapModel> execute(FileData fileData) {
-        logger.trace("Entering execute with {}", fileData);
-        resolveKeyStore();
-
-        String localFile = collectFile(fileData);
-
-        if (localFile != null) {
-            ConsumerDmaapModel consumerDmaapModel = getConsumerDmaapModel(fileData, localFile);
-            logger.trace("Exiting execute with {}", consumerDmaapModel);
-            return Flux.just(consumerDmaapModel);
-        }
-        logger.trace("Exiting execute with empty");
-        return Flux.empty();
-    }
-
-    @Override
-    public FtpesConfig resolveConfiguration() {
-        return datafileAppConfig.getFtpesConfiguration();
-    }
-
-    private void resolveKeyStore() {
-        FtpesConfig ftpesConfig = resolveConfiguration();
-        ftpsClient.setKeyCertPath(ftpesConfig.keyCert());
-        ftpsClient.setKeyCertPassword(ftpesConfig.keyPassword());
-        ftpsClient.setTrustedCAPath(ftpesConfig.trustedCA());
-        ftpsClient.setTrustedCAPassword(ftpesConfig.trustedCAPassword());
-    }
-
-    private String collectFile(FileData fileData) {
-        logger.trace("starting to collectFile");
-        String location = fileData.location();
-        URI uri = URI.create(location);
-        FileServerData fileServerData = getFileServerData(uri);
-        String remoteFile = uri.getPath();
-        String localFile = "target" + File.separator + fileData.name();
-
-        FileCollectClient currentClient = selectClient(fileData, uri);
-
-        if (currentClient != null) {
-            FileCollectResult fileCollectResult = currentClient.collectFile(fileServerData, remoteFile, localFile);
-            if (!fileCollectResult.downloadSuccessful()) {
-                fileCollectResult = retry(fileCollectResult, currentClient);
-            }
-            if (!fileCollectResult.downloadSuccessful()) {
-                localFile = null;
-                logger.error("Download of file aborted after maximum number of retries. Data: {} Error causes {}",
-                        fileServerData, fileCollectResult.getErrorData());
-            }
-        } else {
-            localFile = null;
-        }
-        return localFile;
-    }
-
-    private FileServerData getFileServerData(URI uri) {
-        String[] userInfo = getUserNameAndPasswordIfGiven(uri.getUserInfo());
-        // @formatter:off
-        return ImmutableFileServerData.builder()
-                .serverAddress(uri.getHost())
-                .userId(userInfo != null ? userInfo[0] : "")
-                .password(userInfo != null ? userInfo[1] : "")
-                .port(uri.getPort())
-                .build();
-        // @formatter:on
-    }
-
-    private String[] getUserNameAndPasswordIfGiven(String userInfoString) {
-        String[] userInfo = null;
-        if (userInfoString != null && !userInfoString.isEmpty()) {
-            userInfo = userInfoString.split(":");
-        }
-        return userInfo;
-    }
-
-    private FileCollectClient selectClient(FileData fileData, URI uri) {
-        FileCollectClient selectedClient = null;
-        String scheme = uri.getScheme();
-        if (FTPES.equals(scheme) || FTPS.equals(scheme)) {
-            selectedClient = ftpsClient;
-        } else if (SFTP.equals(scheme)) {
-            selectedClient = sftpClient;
-        } else {
-            logger.error("DFC does not support protocol {}. Supported protocols are {}, {}, and {}. Data: {}", scheme,
-                    FTPES, FTPS, SFTP, fileData);
-        }
-        return selectedClient;
-    }
-
-    private FileCollectResult retry(FileCollectResult fileCollectResult, FileCollectClient fileCollectClient) {
-        int retryCount = 1;
-        FileCollectResult newResult = fileCollectResult;
-        while (!newResult.downloadSuccessful() && retryCount++ < 3) {
-            getRetryTimer().waitRetryTime();
-            newResult = fileCollectClient.retryCollectFile();
-        }
-        return newResult;
-    }
-
-    private ConsumerDmaapModel getConsumerDmaapModel(FileData fileData, String localFile) {
-        String productName = fileData.fileMetaData().productName();
-        String vendorName = fileData.fileMetaData().vendorName();
-        String lastEpochMicrosec = fileData.fileMetaData().lastEpochMicrosec();
-        String sourceName = fileData.fileMetaData().sourceName();
-        String startEpochMicrosec = fileData.fileMetaData().startEpochMicrosec();
-        String timeZoneOffset = fileData.fileMetaData().timeZoneOffset();
-        String name = fileData.name();
-        String location = fileData.location();
-        String internalLocation = localFile;
-        String compression = fileData.compression();
-        String fileFormatType = fileData.fileFormatType();
-        String fileFormatVersion = fileData.fileFormatVersion();
-
-        // @formatter:off
-        return ImmutableConsumerDmaapModel.builder()
-                .productName(productName)
-                .vendorName(vendorName)
-                .lastEpochMicrosec(lastEpochMicrosec)
-                .sourceName(sourceName)
-                .startEpochMicrosec(startEpochMicrosec)
-                .timeZoneOffset(timeZoneOffset)
-                .name(name)
-                .location(location)
-                .internalLocation(internalLocation)
-                .compression(compression)
-                .fileFormatType(fileFormatType)
-                .fileFormatVersion(fileFormatVersion)
-                .build();
-        // @formatter:on
-    }
-
-    private RetryTimer getRetryTimer() {
-        if (retryTimer == null) {
-            retryTimer = new RetryTimer();
-        }
-        return retryTimer;
-    }
-
-    protected void setRetryTimer(RetryTimer retryTimer) {
-        this.retryTimer = retryTimer;
-    }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
index 3299f71..acae1e6 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
@@ -30,27 +30,54 @@
 class CloudConfigParserTest {
 
     private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG =
-            new ImmutableDmaapConsumerConfiguration.Builder().timeoutMs(-1)
-                    .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("admin")
-                    .dmaapUserPassword("admin").dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
-                    .dmaapPortNumber(2222).dmaapContentType("application/json").messageLimit(-1).dmaapProtocol("http")
-                    .consumerId("C12").consumerGroup("OpenDCAE-c12").trustStorePath("trustStorePath")
-                    .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath")
-                    .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true)
+            //@formatter:on
+            new ImmutableDmaapConsumerConfiguration.Builder()
+                    .timeoutMs(-1)
+                    .dmaapHostName("message-router.onap.svc.cluster.local")
+                    .dmaapUserName("admin")
+                    .dmaapUserPassword("admin")
+                    .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT")
+                    .dmaapPortNumber(2222)
+                    .dmaapContentType("application/json")
+                    .messageLimit(-1)
+                    .dmaapProtocol("http")
+                    .consumerId("C12")
+                    .consumerGroup("OpenDCAE-c12")
+                    .trustStorePath("trustStorePath")
+                    .trustStorePasswordPath("trustStorePasswordPath")
+                    .keyStorePath("keyStorePath")
+                    .keyStorePasswordPath("keyStorePasswordPath")
+                    .enableDmaapCertAuth(true)
                     .build();
+            //@formatter:off
 
     private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG =
-            new ImmutableDmaapPublisherConfiguration.Builder().dmaapTopicName("publish").dmaapUserPassword("dradmin")
-                    .dmaapPortNumber(3907).dmaapProtocol("https").dmaapContentType("application/json")
-                    .dmaapHostName("message-router.onap.svc.cluster.local").dmaapUserName("dradmin")
+            //@formatter:on
+            new ImmutableDmaapPublisherConfiguration.Builder()
+                    .dmaapTopicName("publish")
+                    .dmaapUserPassword("dradmin")
+                    .dmaapPortNumber(3907)
+                    .dmaapProtocol("https")
+                    .dmaapContentType("application/json")
+                    .dmaapHostName("message-router.onap.svc.cluster.local")
+                    .dmaapUserName("dradmin")
                     .trustStorePath("trustStorePath")
-                    .trustStorePasswordPath("trustStorePasswordPath").keyStorePath("keyStorePath")
-                    .keyStorePasswordPath("keyStorePasswordPath").enableDmaapCertAuth(true)
+                    .trustStorePasswordPath("trustStorePasswordPath")
+                    .keyStorePath("keyStorePath")
+                    .keyStorePasswordPath("keyStorePasswordPath")
+                    .enableDmaapCertAuth(true)
                     .build();
+            //@formatter:off
 
     private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION =
-            new ImmutableFtpesConfig.Builder().keyCert("/config/ftpKey.jks").keyPassword("secret")
-                    .trustedCA("config/cacerts").trustedCAPassword("secret").build();
+            //@formatter:on
+            new ImmutableFtpesConfig.Builder()
+                    .keyCert("/config/ftpKey.jks")
+                    .keyPassword("secret")
+                    .trustedCA("config/cacerts")
+                    .trustedCAPassword("secret")
+                    .build();
+            //@formatter:off
 
     private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
 
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
index 6230279..2cd854a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/DatafileAppConfigTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
index b5f05a7..efb762a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
@@ -1,18 +1,16 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
new file mode 100644
index 0000000..1f5827c
--- /dev/null
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FileDataTest.java
@@ -0,0 +1,125 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class FileDataTest {
+    private static final String FTPES_SCHEME = "ftpes://";
+    private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+    private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
+    private static final String USER = "usr";
+    private static final String PWD = "pwd";
+    private static final String SERVER_ADDRESS = "192.168.0.101";
+    private static final int PORT_22 = 22;
+    private static final String LOCATION_WITH_USER =
+            FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+    private static final String LOCATION_WITHOUT_USER =
+            FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+    private FileData properFileDataWithUser() {
+        // @formatter:off
+        return ImmutableFileData.builder()
+         .name("name")
+         .location(LOCATION_WITH_USER)
+         .compression("comp")
+         .fileFormatType("type")
+         .fileFormatVersion("version")
+         .scheme(Scheme.FTPS)
+         .build();
+        // @formatter:on
+    }
+
+    private FileData properFileDataWithoutUser() {
+        // @formatter:off
+        return ImmutableFileData.builder()
+            .name("name")
+            .location(LOCATION_WITHOUT_USER)
+            .compression("comp")
+            .fileFormatType("type")
+            .fileFormatVersion("version")
+            .scheme(Scheme.FTPS)
+            .build();
+        // @formatter:on
+    }
+
+    @Test
+    public void fileServerData_properLocationWithUser() {
+        // @formatter:off
+        ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+                .serverAddress(SERVER_ADDRESS)
+                .port(PORT_22)
+                .userId(USER)
+                .password(PWD)
+                .build();
+        // @formatter:on
+
+        FileServerData actualFileServerData = properFileDataWithUser().fileServerData();
+        assertEquals(expectedFileServerData, actualFileServerData);
+    }
+
+    @Test
+    public void fileServerData_properLocationWithoutUser() {
+        // @formatter:off
+        ImmutableFileServerData expectedFileServerData = ImmutableFileServerData.builder()
+                .serverAddress(SERVER_ADDRESS)
+                .port(PORT_22)
+                .userId("")
+                .password("")
+                .build();
+        // @formatter:on
+
+        FileServerData actualFileServerData = properFileDataWithoutUser().fileServerData();
+        assertEquals(expectedFileServerData, actualFileServerData);
+        assertTrue(expectedFileServerData.port().isPresent());
+    }
+
+    @Test
+    public void remoteLocation_properLocation() {
+        String actualRemoteFilePath = properFileDataWithUser().remoteFilePath();
+        assertEquals(REMOTE_FILE_LOCATION, actualRemoteFilePath);
+    }
+
+    @Test
+    public void fileServerData_properLocationWithoutPort() {
+        // @formatter:off
+        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder()
+                .serverAddress(SERVER_ADDRESS)
+                .userId("")
+                .password("")
+                .build();
+        // @formatter:on
+
+        assertFalse(fileServerData.port().isPresent());
+    }
+
+
+}
+
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
similarity index 72%
rename from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
rename to datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index 0ae9ece..f7b8329 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -1,17 +1,19 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.service;
@@ -21,15 +23,20 @@
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
 
@@ -40,7 +47,7 @@
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
-class DmaapConsumerJsonParserTest {
+class JsonMessageParserTest {
     private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
     private static final String PRODUCT_NAME = "NrRadio";
     private static final String VENDOR_NAME = "Ericsson";
@@ -60,7 +67,7 @@
     private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
 
     @Test
-    void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
+    void whenPassingCorrectJson_oneFileReadyMessage() throws DmaapNotFoundException {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -77,7 +84,7 @@
                 .addAdditionalField(additionalField)
                 .build();
 
-        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -88,27 +95,34 @@
                 .changeType(CHANGE_TYPE)
                 .build();
         FileData expectedFileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(PM_FILE_NAME)
                 .location(LOCATION)
+                .scheme(Scheme.FTPS)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
+        List<FileData> files = new ArrayList<>();
+        files.add(expectedFileData);
+        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+                .pnfName(SOURCE_NAME)
+                .messageMetaData(messageMetaData)
+                .files(files)
+                .build();
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNext(expectedFileData).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNext(expectedMessage).verifyComplete();
     }
 
     @Test
-    void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+    void whenPassingCorrectJsonWithTwoEvents_twoMessages() throws DmaapNotFoundException {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -125,7 +139,7 @@
                 .addAdditionalField(additionalField)
                 .build();
 
-        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -136,25 +150,62 @@
                 .changeType(CHANGE_TYPE)
                 .build();
         FileData expectedFileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(PM_FILE_NAME)
                 .location(LOCATION)
+                .scheme(Scheme.FTPS)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
+        List<FileData> files = new ArrayList<>();
+        files.add(expectedFileData);
+        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+                .pnfName(SOURCE_NAME)
+                .messageMetaData(messageMetaData)
+                .files(files)
+                .build();
         // @formatter:on
         String parsedString = message.getParsed();
         String messageString = "[" + parsedString + "," + parsedString + "]";
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+                .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNext(expectedMessage).expectNext(expectedMessage).verifyComplete();
     }
 
     @Test
-    void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
-            throws DmaapNotFoundException {
+    void whenPassingCorrectJsonWithoutLocation_noMessage() {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+        // @formatter:on
+        String messageString = message.toString();
+        String parsedString = message.getParsed();
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
+        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
+                .getJsonObjectFromAnArray(jsonElement);
+
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNextCount(0).verifyComplete();
+    }
+
+    @Test
+    void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan() throws DmaapNotFoundException {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -171,7 +222,7 @@
                 .addAdditionalField(additionalField)
                 .build();
 
-        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -182,20 +233,27 @@
                 .changeType(CHANGE_TYPE)
                 .build();
         FileData expectedFileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(PM_FILE_NAME)
                 .location(LOCATION)
+                .scheme(Scheme.FTPS)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
+        List<FileData> files = new ArrayList<>();
+        files.add(expectedFileData);
+        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+                .pnfName(SOURCE_NAME)
+                .messageMetaData(messageMetaData)
+                .files(files)
+                .build();
         // @formatter:on
         String parsedString = message.getParsed();
         String messageString = "[{\"event\":{}}," + parsedString + "]";
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+        JsonMessageParser jsonMessageParserUnderTest = new JsonMessageParser();
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNext(expectedFileData).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNext(expectedMessage).verifyComplete();
     }
 
     @Test
@@ -217,13 +275,13 @@
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectComplete().verify();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectComplete().verify();
     }
 
     @Test
@@ -245,13 +303,13 @@
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNextCount(0).verifyComplete();
     }
 
     @Test
@@ -266,41 +324,13 @@
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
-        .getJsonObjectFromAnArray(jsonElement);
-
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-        .expectNextCount(0).verifyComplete();
-    }
-
-    @Test
-    void whenPassingCorrectJsonWithoutLocation_noFileData() {
-        // @formatter:off
-        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
-                .name(PM_FILE_NAME)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
-                .build();
-        JsonMessage message = new JsonMessage.JsonMessageBuilder()
-                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
-                .changeIdentifier(CHANGE_IDENTIFIER)
-                .changeType(CHANGE_TYPE)
-                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
-                .addAdditionalField(additionalField)
-                .build();
-        // @formatter:on
-        String messageString = message.toString();
-        String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNextCount(0).verifyComplete();
     }
 
     @Test
@@ -322,13 +352,13 @@
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNextCount(0).verifyComplete();
     }
 
     @Test
@@ -350,13 +380,13 @@
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNextCount(0).verifyComplete();
     }
 
     @Test
@@ -384,7 +414,7 @@
                 .addAdditionalField(additionalField)
                 .build();
 
-        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -395,23 +425,30 @@
                 .changeType(CHANGE_TYPE)
                 .build();
         FileData expectedFileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(PM_FILE_NAME)
                 .location(LOCATION)
+                .scheme(Scheme.FTPS)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
+        List<FileData> files = new ArrayList<>();
+        files.add(expectedFileData);
+        FileReadyMessage expectedMessage = ImmutableFileReadyMessage.builder()
+                .pnfName(SOURCE_NAME)
+                .messageMetaData(messageMetaData)
+                .files(files)
+                .build();
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNext(expectedFileData).verifyComplete();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNext(expectedMessage).verifyComplete();
     }
 
     @Test
@@ -426,24 +463,24 @@
         // @formatter:on
         String incorrectMessageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(incorrectMessageString)))
                 .expectSubscription().expectComplete().verify();
     }
 
     @Test
     void whenPassingJsonWithNullJsonElement_noFileData() {
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse("{}");
 
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just("[{}]"))).expectSubscription()
                 .expectComplete().verify();
     }
 
@@ -466,13 +503,13 @@
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).expectComplete().verify();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectNextCount(0).expectComplete().verify();
     }
 
     @Test
@@ -494,12 +531,12 @@
         // @formatter:on
         String messageString = message.toString();
         String parsedString = message.getParsed();
-        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonMessageParser jsonMessageParserUnderTest = spy(new JsonMessageParser());
         JsonElement jsonElement = new JsonParser().parse(parsedString);
-        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(jsonMessageParserUnderTest)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectComplete().verify();
+        StepVerifier.create(jsonMessageParserUnderTest.getMessagesFromJson(Mono.just(messageString)))
+                .expectSubscription().expectComplete().verify();
     }
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
similarity index 73%
rename from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
rename to datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f8f6cf6..f88e301 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -1,17 +1,21 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
@@ -29,33 +33,32 @@
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.FileReadyMessage;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
-import org.onap.dcaegen2.collectors.datafile.service.DmaapConsumerJsonParser;
-
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.service.JsonMessageParser;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
 import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
-
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
 /**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
-class DmaapConsumerTaskImplTest {
+public class DMaaPMessageConsumerTaskImplTest {
     private static final String NR_RADIO_ERICSSON_EVENT_NAME = "Noti_NrRadio-Ericsson_FileReady";
     private static final String PRODUCT_NAME = "NrRadio";
     private static final String VENDOR_NAME = "Ericsson";
@@ -82,14 +85,16 @@
 
     private static AppConfig appConfig;
     private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
-    private DmaapConsumerTaskImpl dmaapConsumerTask;
+    private DMaaPMessageConsumerTask messageConsumerTask;
     private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
 
-    private static String ftpesMessage;
+    private static String ftpesMessageString;
     private static FileData ftpesFileData;
+    private static FileReadyMessage expectedFtpesMessage;
 
-    private static String sftpMessage;
+    private static String sftpMessageString;
     private static FileData sftpFileData;
+    private static FileReadyMessage expectedSftpMessage;
 
     @BeforeAll
     public static void setUp() {
@@ -129,8 +134,8 @@
                 .addAdditionalField(ftpesAdditionalField)
                 .build();
 
-        ftpesMessage = ftpesJsonMessage.toString();
-        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+        ftpesMessageString = ftpesJsonMessage.toString();
+        MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -141,14 +146,22 @@
                 .changeType(FILE_READY_CHANGE_TYPE)
                 .build();
         ftpesFileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(PM_FILE_NAME)
                 .location(FTPES_LOCATION)
+                .scheme(Scheme.FTPS)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
 
+        List<FileData> files = new ArrayList<>();
+        files.add(ftpesFileData);
+        expectedFtpesMessage = ImmutableFileReadyMessage.builder()
+                .pnfName(SOURCE_NAME)
+                .messageMetaData(messageMetaData)
+                .files(files)
+                .build();
+
         AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
                 .location(SFTP_LOCATION)
                 .compression(GZIP_COMPRESSION)
@@ -162,17 +175,16 @@
                 .notificationFieldsVersion("1.0")
                 .addAdditionalField(sftpAdditionalField)
                 .build();
-        sftpMessage = sftpJsonMessage.toString();
+        sftpMessageString = sftpJsonMessage.toString();
         sftpFileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(PM_FILE_NAME)
                 .location(SFTP_LOCATION)
+                .scheme(Scheme.FTPS)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
 
-
         ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
@@ -188,6 +200,14 @@
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
         listOfConsumerDmaapModel.add(consumerDmaapModel);
+
+        files = new ArrayList<>();
+        files.add(sftpFileData);
+        expectedSftpMessage = ImmutableFileReadyMessage.builder()
+                .pnfName(SOURCE_NAME)
+                .messageMetaData(messageMetaData)
+                .files(files)
+                .build();
         //@formatter:on
     }
 
@@ -195,17 +215,17 @@
     public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
         prepareMocksForDmaapConsumer("", null);
 
-        StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
-                .expectError(DmaapEmptyResponseException.class).verify();
+        StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
+                .expectError(DatafileTaskException.class).verify();
 
         verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
     }
 
     @Test
     public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
-        prepareMocksForDmaapConsumer(ftpesMessage, ftpesFileData);
+        prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
 
-        StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(ftpesFileData).verifyComplete();
+        StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
 
         verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
@@ -213,30 +233,31 @@
 
     @Test
     public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
-        prepareMocksForDmaapConsumer(sftpMessage, sftpFileData);
+        prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
 
-        StepVerifier.create(dmaapConsumerTask.execute(ftpesMessage)).expectNext(sftpFileData).verifyComplete();
+        StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
 
         verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
         verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
     }
 
-    private void prepareMocksForDmaapConsumer(String message, FileData fileDataAfterConsume) {
+    private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
         Mono<String> messageAsMono = Mono.just(message);
-        DmaapConsumerJsonParser dmaapConsumerJsonParserMock = mock(DmaapConsumerJsonParser.class);
+        JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
         dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
         when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
 
         if (!message.isEmpty()) {
-            when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono)).thenReturn(Flux.just(fileDataAfterConsume));
+            when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+                    .thenReturn(Flux.just(fileReadyMessageAfterConsume));
         } else {
-            when(dmaapConsumerJsonParserMock.getJsonObject(messageAsMono))
-                    .thenReturn(Flux.error(new DmaapEmptyResponseException()));
+            when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
+                    .thenReturn(Flux.error(new DatafileTaskException("problemas")));
         }
 
-        dmaapConsumerTask =
-                spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerReactiveHttpClient, dmaapConsumerJsonParserMock));
-        when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
-        doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
+        messageConsumerTask =
+                spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+        when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
+        doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
     }
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
similarity index 75%
rename from datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
rename to datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 5b29bf1..73511d1 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DmaapPublisherTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -25,9 +25,10 @@
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
+
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
@@ -43,7 +44,7 @@
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
-class DmaapPublisherTaskImplTest {
+class DataRouterPublisherTest {
     private static final String PRODUCT_NAME = "NrRadio";
     private static final String VENDOR_NAME = "Ericsson";
     private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -53,7 +54,7 @@
     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
 
     private static ConsumerDmaapModel consumerDmaapModel;
-    private static DmaapPublisherTaskImpl dmaapPublisherTask;
+    private static DataRouterPublisher dmaapPublisherTask;
     private static DmaapProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
     private static AppConfig appConfig;
     private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
@@ -95,20 +96,44 @@
 
     @Test
     public void whenPassedObjectFits_ReturnsCorrectStatus() {
-        prepareMocksForTests(HttpStatus.OK.value());
+        prepareMocksForTests(Flux.just(HttpStatus.OK));
 
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectNext("200").verifyComplete();
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+                .expectNext(consumerDmaapModel).verifyComplete();
 
         verify(dMaaPProducerReactiveHttpClient, times(1)).getDmaapProducerResponse(any());
         verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
-    private void prepareMocksForTests(Integer httpResponseCode) {
+    @Test
+    public void whenPassedObjectFits_firstFailsThenSucceeds() {
+        prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+                .expectNext(consumerDmaapModel).verifyComplete();
+
+        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+    }
+
+    @Test
+    public void whenPassedObjectFits_firstFailsThenFails() {
+        prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
+                .expectErrorMessage("Retries exhausted: 1/1").verify();
+
+        verify(dMaaPProducerReactiveHttpClient, times(2)).getDmaapProducerResponse(any());
+        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+    }
+
+    @SafeVarargs
+    final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
         dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
-        when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any()))
-                .thenReturn(Flux.just(httpResponseCode.toString()));
+        when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
+                nextHttpResponses);
         when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
-        dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
+        dmaapPublisherTask = spy(new DataRouterPublisher(appConfig));
         when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
         doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
     }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 55fa639..10c5b16 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -16,31 +16,30 @@
 
 package org.onap.dcaegen2.collectors.datafile.tasks;
 
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig;
-import org.onap.dcaegen2.collectors.datafile.ftp.ErrorData;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileCollectResult;
-import org.onap.dcaegen2.collectors.datafile.ftp.FileServerData;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient;
-import org.onap.dcaegen2.collectors.datafile.ftp.ImmutableFileServerData;
+import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
 import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
-import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
-import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
+import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
 
 import reactor.test.StepVerifier;
 
@@ -63,7 +62,7 @@
     private static final int PORT_22 = 22;
     private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
     private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
-    private static final String LOCAL_FILE_LOCATION = "target" + File.separator + PM_FILE_NAME;
+    private static final Path LOCAL_FILE_LOCATION = FileData.createLocalFileName(SERVER_ADDRESS, PM_FILE_NAME);
     private static final String USER = "usr";
     private static final String PWD = "pwd";
     private static final String FTPES_LOCATION =
@@ -84,9 +83,11 @@
     private FtpsClient ftpsClientMock = mock(FtpsClient.class);
 
     private SftpClient sftpClientMock = mock(SftpClient.class);
-    private RetryTimer retryTimerMock = mock(RetryTimer.class);
-    // @formatter:off
-    private FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+
+
+    private MessageMetaData createMessageMetaData() {
+        // @formatter:off
+        return ImmutableMessageMetaData.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
                 .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
@@ -95,8 +96,41 @@
                 .timeZoneOffset(TIME_ZONE_OFFSET)
                 .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
                 .changeType(FILE_READY_CHANGE_TYPE)
-                .build();;
-                // @formatter:on
+                .build();
+        // @formatter:on
+    }
+
+    private FileData createFileData() {
+        // @formatter:off
+        return  ImmutableFileData.builder()
+            .name(PM_FILE_NAME)
+            .location(FTPES_LOCATION)
+            .compression(GZIP_COMPRESSION)
+            .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+            .fileFormatVersion(FILE_FORMAT_VERSION)
+            .scheme(Scheme.FTPS)
+            .build();
+        // @formatter:on
+    }
+
+    private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+        // @formatter:off
+        return ImmutableConsumerDmaapModel.builder()
+            .productName(PRODUCT_NAME)
+            .vendorName(VENDOR_NAME)
+            .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+            .sourceName(SOURCE_NAME)
+            .startEpochMicrosec(START_EPOCH_MICROSEC)
+            .timeZoneOffset(TIME_ZONE_OFFSET)
+            .name(PM_FILE_NAME)
+            .location(FTPES_LOCATION)
+            .internalLocation(LOCAL_FILE_LOCATION.toString())
+            .compression(GZIP_COMPRESSION)
+            .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
+            .fileFormatVersion(FILE_FORMAT_VERSION)
+            .build();
+      // @formatter:on
+    }
 
     @BeforeAll
     public static void setUpConfiguration() {
@@ -108,51 +142,18 @@
     }
 
     @Test
-    public void whenFtpesFile_returnCorrectResponse() {
-        XnfCollectorTaskImpl collectorUndetTest =
-                new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+    public void whenFtpesFile_returnCorrectResponse() throws Exception {
+        FileCollector collectorUndetTest =
+                new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
 
-       // @formatter:off
-        FileData fileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
-                .name(PM_FILE_NAME)
-                .location(FTPES_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
-                .build();
+        FileData fileData = createFileData();
 
-        FileServerData fileServerData = ImmutableFileServerData.builder()
-                .serverAddress(SERVER_ADDRESS)
-                .userId(USER)
-                .password(PWD)
-                .port(PORT_22)
-                .build();
-        // @formatter:on
-        when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
-                .thenReturn(new FileCollectResult());
+        ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
 
-        // @formatter:off
-        ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .name(PM_FILE_NAME)
-                .location(FTPES_LOCATION)
-                .internalLocation(LOCAL_FILE_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
-                .build();
-        // @formatter:on
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+                .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
-        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
-                .verifyComplete();
-
-        verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+        verify(ftpsClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verify(ftpsClientMock).setKeyCertPath(FTP_KEY_PATH);
         verify(ftpsClientMock).setKeyCertPassword(FTP_KEY_PASSWORD);
         verify(ftpsClientMock).setTrustedCAPath(TRUSTED_CA_PATH);
@@ -161,30 +162,19 @@
     }
 
     @Test
-    public void whenSftpFile_returnCorrectResponse() {
-        XnfCollectorTaskImpl collectorUndetTest =
-                new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
+    public void whenSftpFile_returnCorrectResponse() throws Exception {
+        FileCollector collectorUndetTest =
+                new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
         // @formatter:off
         FileData fileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
                 .name(PM_FILE_NAME)
                 .location(SFTP_LOCATION)
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
+                .scheme(Scheme.SFTP)
                 .build();
 
-
-        FileServerData fileServerData = ImmutableFileServerData.builder()
-                .serverAddress(SERVER_ADDRESS)
-                .userId("")
-                .password("")
-                .port(PORT_22)
-                .build();
-        // @formatter:on
-        when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
-                .thenReturn(new FileCollectResult());
-        // @formatter:off
         ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
                 .productName(PRODUCT_NAME)
                 .vendorName(VENDOR_NAME)
@@ -194,130 +184,48 @@
                 .timeZoneOffset(TIME_ZONE_OFFSET)
                 .name(PM_FILE_NAME)
                 .location(SFTP_LOCATION)
-                .internalLocation(LOCAL_FILE_LOCATION)
+                .internalLocation(LOCAL_FILE_LOCATION.toString())
                 .compression(GZIP_COMPRESSION)
                 .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
                 .fileFormatVersion(FILE_FORMAT_VERSION)
                 .build();
         // @formatter:on
-        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
-                .verifyComplete();
 
-        verify(sftpClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+                .expectNext(expectedConsumerDmaapModel).verifyComplete();
+
+        verify(sftpClientMock, times(1)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
         verifyNoMoreInteractions(sftpClientMock);
     }
 
     @Test
-    public void whenFtpesFileAlwaysFail_retryAndReturnEmpty() {
-        XnfCollectorTaskImpl collectorUndetTest =
-                new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
-        collectorUndetTest.setRetryTimer(retryTimerMock);
-        // @formatter:off
-        FileData fileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
-                .name(PM_FILE_NAME)
-                .location(FTPES_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
-                .build();
+    public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
+        FileCollector collectorUndetTest =
+                new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+        FileData fileData = createFileData();
+        doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
+                .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
-        FileServerData fileServerData = ImmutableFileServerData.builder()
-                .serverAddress(SERVER_ADDRESS)
-                .userId(USER)
-                .password(PWD)
-                .port(PORT_22)
-                .build();
-        // @formatter:on
-        ErrorData errorData = new ErrorData();
-        errorData.addError("Unable to collect file.", new Exception());
-        when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
-                .thenReturn(new FileCollectResult(errorData));
-        doReturn(new FileCollectResult(errorData)).when(ftpsClientMock).retryCollectFile();
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+                .expectErrorMessage("Retries exhausted: 3/3").verify();
 
-        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
-
-        verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
-        verify(ftpsClientMock, times(2)).retryCollectFile();
+        verify(ftpsClientMock, times(4)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
     }
 
     @Test
-    public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() {
-        XnfCollectorTaskImpl collectorUndetTest =
-                new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
-        collectorUndetTest.setRetryTimer(retryTimerMock);
-        // @formatter:off
-        FileData fileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
-                .name(PM_FILE_NAME)
-                .location(FTPES_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
-                .build();
+    public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception {
+        FileCollector collectorUndetTest =
+                new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
+        doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
+                .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
 
-        FileServerData fileServerData = ImmutableFileServerData.builder()
-                .serverAddress(SERVER_ADDRESS)
-                .userId(USER)
-                .password(PWD)
-                .port(PORT_22)
-                .build();
-        // @formatter:on
-        ErrorData errorData = new ErrorData();
-        errorData.addError("Unable to collect file.", new Exception());
-        when(ftpsClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
-                .thenReturn(new FileCollectResult(errorData));
-        doReturn(new FileCollectResult()).when(ftpsClientMock).retryCollectFile();
-        // @formatter:off
-        ConsumerDmaapModel expectedConsumerDmaapModel = ImmutableConsumerDmaapModel.builder()
-                .productName(PRODUCT_NAME)
-                .vendorName(VENDOR_NAME)
-                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
-                .sourceName(SOURCE_NAME)
-                .startEpochMicrosec(START_EPOCH_MICROSEC)
-                .timeZoneOffset(TIME_ZONE_OFFSET)
-                .name(PM_FILE_NAME)
-                .location(FTPES_LOCATION)
-                .internalLocation(LOCAL_FILE_LOCATION)
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
-                .build();
-        // @formatter:on
-        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNext(expectedConsumerDmaapModel)
-                .verifyComplete();
+        ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
 
+        FileData fileData = createFileData();
+        StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
+                .expectNext(expectedConsumerDmaapModel).verifyComplete();
 
-        verify(ftpsClientMock, times(1)).collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
-        verify(ftpsClientMock, times(1)).retryCollectFile();
+        verify(ftpsClientMock, times(2)).collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
     }
 
-    @Test
-    public void whenWrongScheme_returnEmpty() {
-        XnfCollectorTaskImpl collectorUndetTest =
-                new XnfCollectorTaskImpl(appConfigMock, ftpsClientMock, sftpClientMock);
-        // @formatter:off
-        FileData fileData = ImmutableFileData.builder()
-                .fileMetaData(fileMetaData)
-                .name(PM_FILE_NAME)
-                .location("http://host.com/file.zip")
-                .compression(GZIP_COMPRESSION)
-                .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
-                .fileFormatVersion(FILE_FORMAT_VERSION)
-                .build();
-
-        FileServerData fileServerData = ImmutableFileServerData.builder()
-                .serverAddress(SERVER_ADDRESS)
-                .userId("")
-                .password("")
-                .port(PORT_22)
-                .build();
-        // @formatter:on
-        when(sftpClientMock.collectFile(fileServerData, REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION))
-                .thenReturn(new FileCollectResult());
-
-        StepVerifier.create(collectorUndetTest.execute(fileData)).expectNextCount(0).verifyComplete();
-
-        verifyNoMoreInteractions(sftpClientMock);
-    }
 }
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
index 76c33bb..733aa3e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/utils/JsonMessage.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -78,7 +78,6 @@
                 + "\"version\":3"
                 + "},"
                 + "\"notificationFields\":{"
-        // @formatter:on
                 + getAsStringIfParameterIsSet("changeIdentifier", changeIdentifier,
                         changeType != null || notificationFieldsVersion != null || arrayOfAdditionalFields.size() > 0)
                 + getAsStringIfParameterIsSet("changeType", changeType,
@@ -86,6 +85,7 @@
                 + getAsStringIfParameterIsSet("notificationFieldsVersion", notificationFieldsVersion,
                         arrayOfAdditionalFields.size() > 0)
                 + additionalFieldsString.toString() + "}" + "}" + "}";
+        // @formatter:on
     }
 
     private JsonMessage(final JsonMessageBuilder builder) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
similarity index 89%
rename from datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
rename to datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
index 7a04710..ae1435c 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/exceptions/DatafileTaskException.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,8 +25,8 @@
 
     private static final long serialVersionUID = 1L;
 
-    public DatafileTaskException() {
-        super();
+    public DatafileTaskException(Exception e) {
+        super(e);
     }
 
     public DatafileTaskException(String message) {
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
index 801f170..9f3a318 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctions.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,14 +20,15 @@
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
 
 
-public class CommonFunctions implements JsonBodyBuilder<ConsumerDmaapModel> {
+public class CommonFunctions {
 
     private static Gson gson = new GsonBuilder().serializeNulls().create();
 
-    public  String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
+    private CommonFunctions() {}
+
+    public static String createJsonBody(ConsumerDmaapModel consumerDmaapModel) {
         return gson.toJson(consumerDmaapModel);
     }
-}
+}
\ No newline at end of file
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
index 883a73a..972316b 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModel.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -17,7 +17,6 @@
 package org.onap.dcaegen2.collectors.datafile.model;
 
 import com.google.gson.annotations.SerializedName;
-
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
 import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
index c3e7c15..c50148b 100644
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/FileMetaData.java
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
new file mode 100644
index 0000000..012de74
--- /dev/null
+++ b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/MessageMetaData.java
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.model;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ */
+@Value.Immutable
+@Gson.TypeAdapters
+public interface MessageMetaData {
+    public String productName();
+
+    public String vendorName();
+
+    public String lastEpochMicrosec();
+
+    public String sourceName();
+
+    public String startEpochMicrosec();
+
+    public String timeZoneOffset();
+
+    public String changeIdentifier();
+
+    public String changeType();
+}
diff --git a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java b/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java
deleted file mode 100644
index 91cc3c6..0000000
--- a/datafile-commons/src/main/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model.utils;
-
-import org.springframework.http.HttpStatus;
-
-public final class HttpUtils {
-
-    private HttpUtils() {}
-
-    public static boolean isSuccessfulResponseCode(Integer statusCode) {
-        return statusCode >= HttpStatus.OK.value() && statusCode < HttpStatus.MULTIPLE_CHOICES.value();
-    }
-}
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
index cb6c48d..cbc3e12 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/CommonFunctionsTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -36,7 +36,7 @@
             .fileFormatType("org.3GPP.32.435#measCollec")
             .fileFormatVersion("V10")
             .build();
-    
+
     private static final String EXPECTED_RESULT =
              "{\"productName\":\"NrRadio\","
             + "\"vendorName\":\"Ericsson\","
@@ -53,6 +53,6 @@
     // @formatter:on
     @Test
     void createJsonBody_shouldReturnJsonInString() {
-        assertEquals(EXPECTED_RESULT, new CommonFunctions().createJsonBody(model));
+        assertEquals(EXPECTED_RESULT, CommonFunctions.createJsonBody(model));
     }
-}
+}
\ No newline at end of file
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
index 21a2750..2c5e701 100644
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
+++ b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/ConsumerDmaapModelTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java b/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java
deleted file mode 100644
index 8effcbb..0000000
--- a/datafile-commons/src/test/java/org/onap/dcaegen2/collectors/datafile/model/utils/HttpUtilsTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model.utils;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import org.junit.jupiter.api.Test;
-
-
-public class HttpUtilsTest {
-
-    @Test
-    public void isSuccessfulResponseCode_shouldReturnTrue() {
-        assertTrue(HttpUtils.isSuccessfulResponseCode(202));
-    }
-
-    @Test
-    public void isSuccessfulResponseCode_shouldReturnFalse() {
-        assertFalse(HttpUtils.isSuccessfulResponseCode(502));
-    }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
deleted file mode 100644
index c62f349..0000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorData.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ErrorData {
-    private List<String> errorMessages = new ArrayList<>();
-    private List<Throwable> errorCauses = new ArrayList<>();
-
-    public void addError(String errorMessage, Throwable errorCause) {
-        errorMessages.add(errorMessage);
-        errorCauses.add(errorCause);
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder message = new StringBuilder();
-        for (int i = 0; i < errorMessages.size(); i++) {
-            message.append(errorMessages.get(i));
-            if (errorCauses.get(i) != null) {
-                message.append(" Cause: ").append(errorCauses.get(i));
-            }
-            if (i < errorMessages.size() -1) {
-                message.append("\n");
-            }
-        }
-        return message.toString();
-    }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
index 4b7cc01..29160c9 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -18,11 +18,10 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.TrustManager;
-
 import org.apache.commons.net.ftp.FTPSClient;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 public class FTPSClientWrapper implements IFTPSClient {
     private FTPSClient ftpsClient = new FTPSClient();
@@ -88,8 +87,14 @@
     }
 
     @Override
-    public boolean retrieveFile(String remote, OutputStream local) throws IOException {
-        return ftpsClient.retrieveFile(remote, local);
+    public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException {
+        try {
+            if (!ftpsClient.retrieveFile(remote, local)) {
+                throw new DatafileTaskException("could not retrieve file");
+            }
+        } catch (IOException e) {
+            throw new DatafileTaskException(e);
+        }
     }
 
     @Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
index 42addbf..f330b67 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectClient.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -16,37 +16,12 @@
 
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.file.Path;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 /**
  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
  */
-public abstract class FileCollectClient {
-    protected static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
-
-    protected FileServerData fileServerData;
-    protected String remoteFile;
-    protected String localFile;
-    protected ErrorData errorData;
-
-    public FileCollectResult collectFile(FileServerData fileServerData, String remoteFile, String localFile) {
-        logger.trace("collectFile called with fileServerData: {}, remoteFile: {}, localFile: {}", fileServerData,
-                remoteFile, localFile);
-
-        this.fileServerData = fileServerData;
-        this.remoteFile = remoteFile;
-        this.localFile = localFile;
-
-        return retryCollectFile();
-    }
-
-    public abstract FileCollectResult retryCollectFile();
-
-    protected void addError(String errorMessage, Throwable errorCause) {
-        if (errorData == null) {
-            errorData = new ErrorData();
-        }
-        errorData.addError(errorMessage, errorCause);
-    }
+public interface FileCollectClient {
+    public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException;
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
deleted file mode 100644
index fa1d431..0000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-public class FileCollectResult {
-    private boolean result;
-    private ErrorData errorData;
-
-    public FileCollectResult() {
-        this.result = true;
-    }
-
-    public FileCollectResult(ErrorData errorData) {
-        this.errorData = errorData;
-        result = false;
-    }
-
-    public boolean downloadSuccessful() {
-        return result;
-    }
-
-    public String getErrorData() {
-        if (errorData != null) {
-            return errorData.toString();
-        }
-        return "";
-    }
-
-    @Override
-    public String toString() {
-        return "FileCollectResult: "
-                + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
-    }
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
index d4eca4d..b080c32 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileServerData.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -16,6 +16,8 @@
 
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
+import java.util.Optional;
+
 import org.immutables.value.Value;
 
 /**
@@ -27,5 +29,5 @@
     public String serverAddress();
     public String userId();
     public String password();
-    public int port();
+    public Optional<Integer> port();
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 0d055fc..461b220 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -19,17 +19,20 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Optional;
 
 import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPReply;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.IFile;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
-import org.onap.dcaegen2.collectors.datafile.io.OutputStreamWrapper;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils.KeyManagerException;
 import org.onap.dcaegen2.collectors.datafile.ssl.IKeyStore;
@@ -37,118 +40,106 @@
 import org.onap.dcaegen2.collectors.datafile.ssl.KeyManagerUtilsWrapper;
 import org.onap.dcaegen2.collectors.datafile.ssl.KeyStoreWrapper;
 import org.onap.dcaegen2.collectors.datafile.ssl.TrustManagerFactoryWrapper;
-import org.springframework.stereotype.Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Gets file from xNF with FTPS protocol.
  *
  * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
  */
-@Component
-public class FtpsClient extends FileCollectClient {
+public class FtpsClient implements FileCollectClient {
+    private static final Logger logger = LoggerFactory.getLogger(FtpsClient.class);
     private String keyCertPath;
     private String keyCertPassword;
-    private String trustedCAPath;
+    private Path trustedCAPath;
     private String trustedCAPassword;
 
-    private IFTPSClient realFtpsClient;
-    private IKeyManagerUtils kmu;
+    private IFTPSClient realFtpsClient = new FTPSClientWrapper();
+    private IKeyManagerUtils keyManagerUtils = new KeyManagerUtilsWrapper();
     private IKeyStore keyStore;
     private ITrustManagerFactory trustManagerFactory;
-    private IFile lf;
-    private IFileSystemResource fileResource;
-    private IOutputStream os;
+    private IFile localFile = new FileWrapper();
+    private IFileSystemResource fileSystemResource = new FileSystemResourceWrapper();
+    private IOutputStream outputStream;
     private boolean keyManagerSet = false;
     private boolean trustManagerSet = false;
+    private final FileServerData fileServerData;
 
-    @Override
-    public FileCollectResult retryCollectFile() {
-        logger.trace("retryCollectFile called");
 
-        FileCollectResult fileCollectResult;
-
-        IFTPSClient ftps = getFtpsClient();
-
-        ftps.setNeedClientAuth(true);
-
-        if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
-            if (getFileFromxNF(ftps)) {
-                fileCollectResult = new FileCollectResult();
-            } else {
-                fileCollectResult = new FileCollectResult(errorData);
-            }
-        } else {
-            fileCollectResult = new FileCollectResult(errorData);
-        }
-        closeDownConnection(ftps);
-        logger.trace("retryCollectFile left with result: {}", fileCollectResult);
-        return fileCollectResult;
+    public FtpsClient(FileServerData fileServerData) {
+        this.fileServerData = fileServerData;
     }
 
-    private boolean setUpKeyManager(IFTPSClient ftps) {
-        boolean result = true;
+    @Override
+    public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+        logger.trace("collectFile called");
+
+        try {
+            realFtpsClient.setNeedClientAuth(true);
+            setUpKeyManager(realFtpsClient);
+            setUpTrustedCA(realFtpsClient);
+            setUpConnection(realFtpsClient);
+            getFileFromxNF(realFtpsClient, remoteFile, localFile);
+        } catch (IOException e) {
+            logger.trace("", e);
+            throw new DatafileTaskException("Could not open connection: " + e);
+        } catch (KeyManagerException e) {
+            logger.trace("", e);
+            throw new DatafileTaskException(e);
+        } finally {
+            closeDownConnection(realFtpsClient);
+        }
+        logger.trace("collectFile fetched: {}", localFile);
+    }
+
+    private void setUpKeyManager(IFTPSClient ftps) throws KeyManagerException {
         if (keyManagerSet) {
             logger.trace("keyManager already set!");
-            return result;
-        }
-        try {
-            IKeyManagerUtils keyManagerUtils = getKeyManagerUtils();
+        } else {
             keyManagerUtils.setCredentials(keyCertPath, keyCertPassword);
             ftps.setKeyManager(keyManagerUtils.getClientKeyManager());
             keyManagerSet = true;
-        } catch (KeyManagerException e) {
-            addError("Unable to use own key store " + keyCertPath, e);
-            result = false;
         }
         logger.trace("complete setUpKeyManager");
-        return result;
     }
 
-    private boolean setUpTrustedCA(IFTPSClient ftps) {
-        boolean result = true;
+    private void setUpTrustedCA(IFTPSClient ftps) throws DatafileTaskException {
         if (trustManagerSet) {
             logger.trace("trustManager already set!");
-            return result;
-        }
-        try {
-            IFileSystemResource fileSystemResource = getFileSystemResource();
-            fileSystemResource.setPath(trustedCAPath);
-            InputStream fis = fileSystemResource.getInputStream();
-            IKeyStore ks = getKeyStore();
-            ks.load(fis, trustedCAPassword.toCharArray());
-            fis.close();
-            ITrustManagerFactory tmf = getTrustManagerFactory();
-            tmf.init(ks.getKeyStore());
-            ftps.setTrustManager(tmf.getTrustManagers()[0]);
-            trustManagerSet = true;
-
-        } catch (Exception e) {
-            addError("Unable to trust xNF's CA, " + trustedCAPath, e);
-            result = false;
+        } else {
+            try {
+                fileSystemResource.setPath(trustedCAPath);
+                InputStream fis = fileSystemResource.getInputStream();
+                IKeyStore ks = getKeyStore();
+                ks.load(fis, trustedCAPassword.toCharArray());
+                fis.close();
+                ITrustManagerFactory tmf = getTrustManagerFactory();
+                tmf.init(ks.getKeyStore());
+                ftps.setTrustManager(tmf.getTrustManagers()[0]);
+                trustManagerSet = true;
+            } catch (Exception e) {
+                throw new DatafileTaskException("Unable to trust xNF's CA, " + trustedCAPath + " " + e);
+            }
         }
         logger.trace("complete setUpTrustedCA");
-        return result;
     }
 
-    private boolean setUpConnection(IFTPSClient ftps) {
-        boolean result = true;
-        try {
-            if (ftps.isConnected()) {
-                addError(
-                        "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
-                        null);
-                return false;
-            }
-            ftps.connect(fileServerData.serverAddress(), fileServerData.port());
+    private int getPort(Optional<Integer> port) {
+        final int FTPS_DEFAULT_PORT = 21;
+        return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+    }
+
+    private void setUpConnection(IFTPSClient ftps) throws DatafileTaskException, IOException {
+        if (!ftps.isConnected()) {
+            ftps.connect(fileServerData.serverAddress(), getPort(fileServerData.port()));
             logger.trace("after ftp connect");
-            boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
-            if (!loginSuccesful) {
-                closeDownConnection(ftps);
-                addError("Unable to log in to xNF. " + fileServerData, null);
-                return false;
+
+            if (!ftps.login(fileServerData.userId(), fileServerData.password())) {
+                throw new DatafileTaskException("Unable to log in to xNF. " + fileServerData.serverAddress());
             }
 
-            if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
+            if (FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
                 ftps.enterLocalPassiveMode();
                 ftps.setFileType(FTP.BINARY_FILE_TYPE);
                 // Set protection buffer size
@@ -157,54 +148,29 @@
                 ftps.execPROT("P");
                 ftps.setBufferSize(1024 * 1024);
             } else {
-                closeDownConnection(ftps);
-                addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
-                        null);
-                return false;
+                throw new DatafileTaskException("Unable to connect to xNF. " + fileServerData.serverAddress()
+                        + " xNF reply code: " + ftps.getReplyCode());
             }
-        } catch (Exception e) {
-            logger.trace("connect to ftp server failed.", e);
-            addError("Unable to connect to xNF. Data: " + fileServerData, e);
-            closeDownConnection(ftps);
-            return false;
         }
         logger.trace("setUpConnection successfully!");
-        return result;
     }
 
-    private boolean getFileFromxNF(IFTPSClient ftps) {
+    private void getFileFromxNF(IFTPSClient ftps, String remoteFileName, Path localFileName)
+            throws IOException, DatafileTaskException {
         logger.trace("starting to getFile");
-        boolean result = true;
-        IFile outfile = getFile();
-        try {
-            outfile.setPath(localFile);
-            outfile.createNewFile();
 
-            IOutputStream outputStream = getOutputStream();
-            OutputStream output = outputStream.getOutputStream(outfile.getFile());
-            logger.trace("begin to retrieve from xNF.");
-            result = ftps.retrieveFile(remoteFile, output);
-            logger.trace("end retrieve from xNF.");
-            if (!result) {
-                output.close();
-                logger.debug("Unable to retrieve file from xNF. Cause unknown!");
-                addError("Unable to retrieve file from xNF. Cause unknown!", null);
-                return result;
-            }
-            output.close();
-            logger.debug("File {} Download Successfull from xNF", localFile);
-        } catch (IOException ex) {
-            addError("Unable to collect file from xNF. Data: " + fileServerData, ex);
-            try {
-                outfile.delete();
-            } catch (Exception e) {
-                logger.trace("Unable to delete file {}.", localFile, e);
-            }
-            return false;
-        }
-        return result;
+        this.localFile.setPath(localFileName);
+        this.localFile.createNewFile();
+
+        OutputStream output = this.outputStream.getOutputStream(this.localFile.getFile());
+        logger.trace("begin to retrieve from xNF.");
+        ftps.retrieveFile(remoteFileName, output);
+        logger.trace("end retrieve from xNF.");
+        output.close();
+        logger.debug("File {} Download Successfull from xNF", localFileName);
     }
 
+
     private void closeDownConnection(IFTPSClient ftps) {
         logger.trace("starting to closeDownConnection");
         if (ftps != null && ftps.isConnected()) {
@@ -232,7 +198,7 @@
     }
 
     public void setTrustedCAPath(String trustedCAPath) {
-        this.trustedCAPath = trustedCAPath;
+        this.trustedCAPath = Paths.get(trustedCAPath);
     }
 
     public void setTrustedCAPassword(String trustedCAPassword) {
@@ -246,21 +212,6 @@
         return trustManagerFactory;
     }
 
-    private IFTPSClient getFtpsClient() {
-        if (realFtpsClient == null) {
-            realFtpsClient = new FTPSClientWrapper();
-        }
-        return realFtpsClient;
-    }
-
-    private IKeyManagerUtils getKeyManagerUtils() {
-        if (kmu == null) {
-            kmu = new KeyManagerUtilsWrapper();
-        }
-
-        return kmu;
-    }
-
     private IKeyStore getKeyStore() throws KeyStoreException {
         if (keyStore == null) {
             keyStore = new KeyStoreWrapper();
@@ -269,54 +220,31 @@
         return keyStore;
     }
 
-    private IFile getFile() {
-        if (lf == null) {
-            lf = new FileWrapper();
-        }
-
-        return lf;
-    }
-
-    private IOutputStream getOutputStream() {
-        if (os == null) {
-            os = new OutputStreamWrapper();
-        }
-
-        return os;
-    }
-
-    private IFileSystemResource getFileSystemResource() {
-        if (fileResource == null) {
-            fileResource = new FileSystemResourceWrapper();
-        }
-        return fileResource;
-    }
-
-    protected void setFtpsClient(IFTPSClient ftpsClient) {
+    void setFtpsClient(IFTPSClient ftpsClient) {
         this.realFtpsClient = ftpsClient;
     }
 
-    protected void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
-        this.kmu = keyManagerUtils;
+    void setKeyManagerUtils(IKeyManagerUtils keyManagerUtils) {
+        this.keyManagerUtils = keyManagerUtils;
     }
 
-    protected void setKeyStore(IKeyStore keyStore) {
+    void setKeyStore(IKeyStore keyStore) {
         this.keyStore = keyStore;
     }
 
-    protected void setTrustManagerFactory(ITrustManagerFactory tmf) {
+    void setTrustManagerFactory(ITrustManagerFactory tmf) {
         trustManagerFactory = tmf;
     }
 
-    protected void setFile(IFile file) {
-        lf = file;
+    void setFile(IFile file) {
+        localFile = file;
     }
 
-    protected void setOutputStream(IOutputStream outputStream) {
-        os = outputStream;
+    void setOutputStream(IOutputStream outputStream) {
+        this.outputStream = outputStream;
     }
 
-    protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
-        fileResource = fileSystemResource;
+    void setFileSystemResource(IFileSystemResource fileSystemResource) {
+        this.fileSystemResource = fileSystemResource;
     }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
index 1a58163..3dcaa65 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -18,9 +18,9 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.TrustManager;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 public interface IFTPSClient {
     public void setNeedClientAuth(boolean isNeedClientAuth);
@@ -51,7 +51,7 @@
 
     public void execPROT(String prot) throws IOException;
 
-    public boolean retrieveFile(String remote, OutputStream local) throws IOException;
+    public void retrieveFile(String remote, OutputStream local) throws DatafileTaskException;
 
     void setTimeout(Integer t);
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
new file mode 100644
index 0000000..d469da6
--- /dev/null
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/Scheme.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * Enum specifying the schemes that DFC support for downloading files.
+ *
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public enum Scheme {
+    FTPS, SFTP;
+
+    /**
+     * Get a <code>Scheme</code> from a string.
+     *
+     * @param schemeString the string to convert to <code>Scheme</code>.
+     * @return The corresponding <code>Scheme</code>
+     * @throws Exception if the value of the string doesn't match any defined scheme.
+     */
+    public static Scheme getSchemeFromString(String schemeString) throws DatafileTaskException {
+        Scheme result;
+        if ("FTPS".equalsIgnoreCase(schemeString) || "FTPES".equalsIgnoreCase(schemeString)) {
+            result = Scheme.FTPS;
+        } else if ("SFTP".equalsIgnoreCase(schemeString)) {
+            result = Scheme.SFTP;
+        } else {
+            throw new DatafileTaskException("DFC does not support protocol " + schemeString
+                    + ". Supported protocols are FTPES , FTPS, and SFTP");
+        }
+        return result;
+    }
+}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
index e8fc695..0c6491b 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClient.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -21,10 +21,13 @@
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
-import com.jcraft.jsch.SftpException;
 
-import org.apache.commons.io.FilenameUtils;
-import org.springframework.stereotype.Component;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Gets file from xNF with SFTP protocol.
@@ -32,65 +35,52 @@
  * @author <a href="mailto:martin.c.yan@est.tech">Martin Yan</a>
  *
  */
-@Component
-public class SftpClient extends FileCollectClient {
-    @Override
-    public FileCollectResult retryCollectFile() {
-        logger.trace("retryCollectFile called");
+public class SftpClient implements FileCollectClient {
+    private static final Logger logger = LoggerFactory.getLogger(SftpClient.class);
+    private final FileServerData fileServerData;
 
-        FileCollectResult result;
-        Session session = setUpSession(fileServerData);
-
-        if (session != null) {
-            ChannelSftp sftpChannel = getChannel(session, fileServerData);
-            if (sftpChannel != null) {
-                try {
-                    sftpChannel.get(remoteFile, localFile);
-                    result = new FileCollectResult();
-                    logger.debug("File {} Download Successfull from xNF", FilenameUtils.getName(localFile));
-                } catch (SftpException e) {
-                    addError("Unable to get file from xNF. Data: " + fileServerData, e);
-                    result = new FileCollectResult(errorData);
-                }
-
-                sftpChannel.exit();
-            } else {
-                result = new FileCollectResult(errorData);
-            }
-            session.disconnect();
-        } else {
-            result = new FileCollectResult(errorData);
-        }
-        logger.trace("retryCollectFile left with result: {}", result);
-        return result;
+    public SftpClient(FileServerData fileServerData) {
+        this.fileServerData = fileServerData;
     }
 
-    private Session setUpSession(FileServerData fileServerData) {
+    @Override
+    public void collectFile(String remoteFile, Path localFile) throws DatafileTaskException {
+        logger.trace("collectFile called");
+
+        try {
+            Session session = setUpSession(fileServerData);
+            ChannelSftp sftpChannel = getChannel(session);
+            sftpChannel.get(remoteFile, localFile.toString());
+            logger.debug("File {} Download Successfull from xNF", localFile.getFileName());
+            sftpChannel.exit();
+            session.disconnect();
+        } catch (Exception e) {
+            throw new DatafileTaskException("Unable to get file from xNF. Data: " + fileServerData + e);
+        }
+
+        logger.trace("collectFile OK");
+
+    }
+
+    private int getPort(Optional<Integer> port) {
+        final int FTPS_DEFAULT_PORT = 22;
+        return port.isPresent() ? port.get() : FTPS_DEFAULT_PORT;
+    }
+
+    private Session setUpSession(FileServerData fileServerData) throws JSchException {
         JSch jsch = new JSch();
 
-        Session session = null;
-        try {
-            session = jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), fileServerData.port());
-            session.setConfig("StrictHostKeyChecking", "no");
-            session.setPassword(fileServerData.password());
-            session.connect();
-        } catch (JSchException e) {
-            addError("Unable to set up SFTP connection to xNF. Data: " + fileServerData, e);
-            session = null;
-        }
+        Session session =
+                jsch.getSession(fileServerData.userId(), fileServerData.serverAddress(), getPort(fileServerData.port()));
+        session.setConfig("StrictHostKeyChecking", "no");
+        session.setPassword(fileServerData.password());
+        session.connect();
         return session;
     }
 
-    private ChannelSftp getChannel(Session session, FileServerData fileServerData) {
-        ChannelSftp sftpChannel = null;
-        try {
-            Channel channel;
-            channel = session.openChannel("sftp");
-            channel.connect();
-            sftpChannel = (ChannelSftp) channel;
-        } catch (JSchException e) {
-            addError("Unable to get sftp channel to xNF. Data: " + fileServerData, e);
-        }
-        return sftpChannel;
+    private ChannelSftp getChannel(Session session) throws JSchException {
+        Channel channel = session.openChannel("sftp");
+        channel.connect();
+        return (ChannelSftp) channel;
     }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
index 95de2de..5295b12 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileSystemResourceWrapper.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,14 +20,14 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-
+import java.nio.file.Path;
 import org.springframework.core.io.FileSystemResource;
 
 public class FileSystemResourceWrapper implements IFileSystemResource {
     private FileSystemResource realResource;
 
     @Override
-    public void setPath(String path) {
+    public void setPath(Path path) {
         realResource = new FileSystemResource(path);
     }
     @Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
index 32b6c72..203a598 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/FileWrapper.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,13 +20,14 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 
 public class FileWrapper implements IFile {
     private File file;
 
     @Override
-    public void setPath(String path) {
-        file = new File(path);
+    public void setPath(Path path) {
+        file = path.toFile();
     }
 
     @Override
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
index a7094f6..2b95842 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFile.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,9 +20,10 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 
 public interface IFile {
-    public void setPath(String path);
+    public void setPath(Path path);
 
     public boolean createNewFile() throws IOException;
 
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
index db30396..23f14a3 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IFileSystemResource.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -18,10 +18,11 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Path;
 
 public interface IFileSystemResource {
 
-    public void setPath(String filePath);
+    public void setPath(Path filePath);
 
     public InputStream getInputStream() throws IOException;
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
index 1ef790c..8015ea7 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/IOutputStream.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
index 830a571..8878782 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/io/OutputStreamWrapper.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
index 2e9c848..e99b811 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClient.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
index 2b44233..1e1187a 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtils.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index b3c8c3e..bced3d8 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -23,6 +23,11 @@
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.Future;
 
 import javax.net.ssl.SSLContext;
@@ -36,17 +41,16 @@
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.apache.http.impl.nio.client.HttpAsyncClients;
 import org.apache.http.ssl.SSLContextBuilder;
-
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
 import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.core.publisher.Flux;
@@ -73,7 +77,7 @@
     private final String user;
     private final String pwd;
 
-    private IFileSystemResource fileResource;
+    private IFileSystemResource fileResource = new FileSystemResourceWrapper();
     private CloseableHttpAsyncClient webClient;
 
     /**
@@ -97,10 +101,10 @@
      * @param consumerDmaapModel - object which will be sent to DMaaP DataRouter
      * @return status code of operation
      */
-    public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
+    public Flux<HttpStatus> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
         logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
         try {
-            logger.trace("Starting to publish to DR");
+            logger.trace("Starting to publish to DR {}",  consumerDmaapModel.getInternalLocation());
 
             webClient = getWebClient();
             webClient.start();
@@ -114,20 +118,10 @@
             HttpResponse response = future.get();
             logger.trace(response.toString());
             webClient.close();
-            handleHttpResponse(response);
-            return Flux.just(response.toString());
+            return Flux.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
         } catch (Exception e) {
-            logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
-            return Flux.empty();
-        }
-    }
-
-    private void handleHttpResponse(HttpResponse response) {
-        int statusCode = response.getStatusLine().getStatusCode();
-        if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
-            logger.trace("Publish to DR successful!");
-        } else {
-            logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+            logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+            return Flux.error(e);
         }
     }
 
@@ -142,28 +136,20 @@
 
     private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
         put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
-        JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(model));
+        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
         String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
         put.addHeader(X_DMAAP_DR_META, metaData.toString());
         put.setURI(getUri(name));
     }
 
-    private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
-        String fileLocation = model.getInternalLocation();
-        IFileSystemResource fileSystemResource = getFileSystemResource();
-        fileSystemResource.setPath(fileLocation);
-        InputStream fileInputStream = null;
-        try {
-            fileInputStream = fileSystemResource.getInputStream();
-        } catch (IOException e) {
-            logger.error("Unable to get stream from filesystem.", e);
-        }
-        try {
-            put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
-        } catch (IOException e) {
-            logger.error("Unable to set put request body from ByteArray.", e);
-        }
+    private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+        Path fileLocation = Paths.get(model.getInternalLocation());
+        this.fileResource.setPath(fileLocation);
+        InputStream fileInputStream = fileResource.getInputStream();
+
+        put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+
     }
 
     private URI getUri(String fileName) {
@@ -172,27 +158,19 @@
                 .path(path).build();
     }
 
-    private IFileSystemResource getFileSystemResource() {
-        if (fileResource == null) {
-            fileResource = new FileSystemResourceWrapper();
-        }
-        return fileResource;
-    }
-
-    protected void setFileSystemResource(IFileSystemResource fileSystemResource) {
+    void setFileSystemResource(IFileSystemResource fileSystemResource) {
         fileResource = fileSystemResource;
     }
 
-    protected CloseableHttpAsyncClient getWebClient() {
+    protected CloseableHttpAsyncClient getWebClient()
+            throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
         if (webClient != null) {
             return webClient;
         }
         SSLContext sslContext = null;
-        try {
-            sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
-        } catch (Exception e) {
-            logger.trace("Unable to get sslContext.", e);
-        }
+
+        sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+
         //@formatter:off
         return HttpAsyncClients.custom()
                 .setSSLContext(sslContext)
@@ -205,4 +183,4 @@
     protected void setWebClient(CloseableHttpAsyncClient client) {
         this.webClient = client;
     }
-}
+}
\ No newline at end of file
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java
deleted file mode 100644
index b4edf82..0000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/ErrorDataTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import org.junit.jupiter.api.Test;
-
-public class ErrorDataTest {
-
-    @Test
-    public void emptyData() {
-        ErrorData dataUnderTest = new ErrorData();
-
-        assertEquals("", dataUnderTest.toString());
-    }
-
-    @Test
-    public void withData() {
-        ErrorData dataUnderTest = new ErrorData();
-        dataUnderTest.addError("Error", null);
-        dataUnderTest.addError("Null", new NullPointerException("Null"));
-
-        assertEquals("Error\nNull Cause: java.lang.NullPointerException: Null", dataUnderTest.toString());
-    }
-}
-
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
deleted file mode 100644
index 38d2423..0000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.ftp;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
-import org.junit.jupiter.api.Test;
-
-public class FileCollectResultTest {
-
-    @Test
-    public void successfulResult() {
-        FileCollectResult resultUnderTest = new FileCollectResult();
-        assertTrue(resultUnderTest.downloadSuccessful());
-        assertEquals("FileCollectResult: successful!", resultUnderTest.toString());
-    }
-
-    @Test
-    public void unSuccessfulResult() {
-        ErrorData errorData = new ErrorData();
-        errorData.addError("Error", null);
-        errorData.addError("Null", new NullPointerException());
-        FileCollectResult resultUnderTest = new FileCollectResult(errorData);
-        assertFalse(resultUnderTest.downloadSuccessful());
-        assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(),
-                resultUnderTest.toString());
-    }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index c134b79..c457726 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -16,8 +16,7 @@
 
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -29,6 +28,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
 import java.security.KeyStoreException;
@@ -40,6 +41,7 @@
 import org.apache.commons.net.ftp.FTPReply;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.io.IFile;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.io.IOutputStream;
@@ -51,12 +53,12 @@
 public class FtpsClientTest {
 
     private static final String REMOTE_FILE_PATH = "/dir/sample.txt";
-    private static final String LOCAL_FILE_PATH = "target/sample.txt";
+    private static final Path LOCAL_FILE_PATH = Paths.get("target/sample.txt");
     private static final String XNF_ADDRESS = "127.0.0.1";
     private static final int PORT = 8021;
     private static final String FTP_KEY_PATH = "ftpKeyPath";
     private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
-    private static final String TRUSTED_CA_PATH = "trustedCAPath";
+    private static final Path TRUSTED_CA_PATH = Paths.get("trustedCAPath");
     private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword";
 
     private static final String USERNAME = "bob";
@@ -74,7 +76,14 @@
     private IOutputStream outputStreamMock = mock(IOutputStream.class);
     private InputStream inputStreamMock = mock(InputStream.class);
 
-    FtpsClient clientUnderTest = new FtpsClient();
+    FtpsClient clientUnderTest = new FtpsClient(createFileServerData());
+
+
+    private ImmutableFileServerData createFileServerData() {
+        return ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
+                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+    }
+
 
     @BeforeEach
     protected void setUp() throws Exception {
@@ -88,7 +97,7 @@
 
         clientUnderTest.setKeyCertPath(FTP_KEY_PATH);
         clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
-        clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH);
+        clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH.toString());
         clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD);
     }
 
@@ -104,15 +113,10 @@
         when(localFileMock.getFile()).thenReturn(fileMock);
         OutputStream osMock = mock(OutputStream.class);
         when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
-        when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true);
         when(ftpsClientMock.isConnected()).thenReturn(false, true);
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertTrue(result.downloadSuccessful());
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
         verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -143,16 +147,14 @@
     public void collectFileFaultyOwnKey_shouldFail() throws Exception {
         doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())).when(keyManagerUtilsMock)
                 .setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        when(ftpsClientMock.isConnected()).thenReturn(false);
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+            .hasMessage("org.onap.dcaegen2.collectors.datafile.ssl.IKeyManagerUtils$KeyManagerException: java.security.GeneralSecurityException");
 
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertFalse(result.downloadSuccessful());
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
-        verify(ftpsClientMock, times(1)).isConnected();
+        verify(ftpsClientMock).isConnected();
         verifyNoMoreInteractions(ftpsClientMock);
     }
 
@@ -164,21 +166,8 @@
 
         doThrow(new KeyStoreException()).when(trustManagerFactoryMock).init(keyStoreMock);
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
-
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertFalse(result.downloadSuccessful());
-        verify(ftpsClientMock).setNeedClientAuth(true);
-        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
-        verify(ftpsClientMock).setKeyManager(keyManagerMock);
-        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
-        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
-        verify(inputStreamMock, times(1)).close();
-        verify(trustManagerFactoryMock).init(keyStoreMock);
-        verify(ftpsClientMock, times(1)).isConnected();
-        verifyNoMoreInteractions(ftpsClientMock);
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+            .hasMessage("Unable to trust xNF's CA, trustedCAPath java.security.KeyStoreException");
     }
 
     @Test
@@ -189,12 +178,9 @@
         when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
         when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(false);
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+                .hasMessage("Unable to log in to xNF. 127.0.0.1");
 
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertFalse(result.downloadSuccessful());
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
         verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -205,8 +191,6 @@
         verify(ftpsClientMock).setTrustManager(trustManagerMock);
         verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
         verify(ftpsClientMock).login(USERNAME, PASSWORD);
-        verify(ftpsClientMock, times(3)).isConnected();
-        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -218,12 +202,9 @@
         when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
         when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+            .hasMessage("Unable to connect to xNF. 127.0.0.1 xNF reply code: 503");
 
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertFalse(result.downloadSuccessful());
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
         verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -235,7 +216,7 @@
         verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
         verify(ftpsClientMock).login(USERNAME, PASSWORD);
         verify(ftpsClientMock, times(2)).getReplyCode();
-        verify(ftpsClientMock, times(3)).isConnected();
+        verify(ftpsClientMock, times(2)).isConnected();
         verifyNoMoreInteractions(ftpsClientMock);
     }
 
@@ -248,12 +229,9 @@
 
         doThrow(new IOException()).when(ftpsClientMock).connect(XNF_ADDRESS, PORT);
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+            .hasMessage("Could not open connection: java.io.IOException");
 
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertFalse(result.downloadSuccessful());
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
         verify(ftpsClientMock).setKeyManager(keyManagerMock);
@@ -263,7 +241,7 @@
         verify(trustManagerFactoryMock).init(keyStoreMock);
         verify(ftpsClientMock).setTrustManager(trustManagerMock);
         verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
-        verify(ftpsClientMock, times(3)).isConnected();
+        verify(ftpsClientMock, times(2)).isConnected();
         verifyNoMoreInteractions(ftpsClientMock);
     }
 
@@ -278,33 +256,9 @@
 
         doThrow(new IOException()).when(localFileMock).createNewFile();
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+            .hasMessage("Could not open connection: java.io.IOException");
 
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertFalse(result.downloadSuccessful());
-        verify(localFileMock, times(1)).delete();
-        verify(ftpsClientMock).setNeedClientAuth(true);
-        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
-        verify(ftpsClientMock).setKeyManager(keyManagerMock);
-        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
-        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
-        verify(inputStreamMock, times(1)).close();
-        verify(trustManagerFactoryMock).init(keyStoreMock);
-        verify(ftpsClientMock).setTrustManager(trustManagerMock);
-        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
-        verify(ftpsClientMock).login(USERNAME, PASSWORD);
-        verify(ftpsClientMock).getReplyCode();
-        verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
-        verify(ftpsClientMock).execPBSZ(0);
-        verify(ftpsClientMock).execPROT("P");
-        verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
-        verify(ftpsClientMock).setBufferSize(1024 * 1024);
-        verify(localFileMock).setPath(LOCAL_FILE_PATH);
-        verify(localFileMock, times(1)).createNewFile();
-        verify(ftpsClientMock, times(2)).isConnected();
-        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -319,14 +273,11 @@
         when(localFileMock.getFile()).thenReturn(fileMock);
         OutputStream osMock = mock(OutputStream.class);
         when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
-        when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(false);
+        doThrow(new DatafileTaskException("problemas")).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
 
-        ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
-                .userId(USERNAME).password(PASSWORD).port(PORT).build();
+        assertThatThrownBy(() -> clientUnderTest.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH))
+            .hasMessage("problemas");
 
-        FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
-
-        assertFalse(result.downloadSuccessful());
         verify(ftpsClientMock).setNeedClientAuth(true);
         verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
         verify(ftpsClientMock).setKeyManager(keyManagerMock);
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
new file mode 100644
index 0000000..162a0e7
--- /dev/null
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SchemeTest.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.collectors.datafile.ftp;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+
+/**
+ * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
+ *
+ */
+public class SchemeTest {
+    @Test
+    public void getSchemeFromString_properScheme() throws DatafileTaskException {
+
+        Scheme actualScheme = Scheme.getSchemeFromString("FTPES");
+        assertEquals(Scheme.FTPS, actualScheme);
+
+        actualScheme = Scheme.getSchemeFromString("FTPS");
+        assertEquals(Scheme.FTPS, actualScheme);
+
+        actualScheme = Scheme.getSchemeFromString("SFTP");
+        assertEquals(Scheme.SFTP, actualScheme);
+    }
+
+    @Test
+    public void getSchemeFromString_invalidScheme() {
+        assertTrue(assertThrows(DatafileTaskException.class, () -> Scheme.getSchemeFromString("invalid")).getMessage()
+                .startsWith("DFC does not support protocol invalid"));
+    }
+}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
index e9e68bb..7f32e8c 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java
@@ -1,27 +1,25 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
 package org.onap.dcaegen2.collectors.datafile.ftp;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+
 import static org.apache.commons.io.IOUtils.toByteArray;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.github.stefanbirkner.fakesftpserver.rule.FakeSftpServerRule;
@@ -31,19 +29,21 @@
 import com.jcraft.jsch.Session;
 import com.jcraft.jsch.SftpException;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
 import org.junit.Rule;
 import org.junit.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 
 public class SftpClientTest {
     private static final String USERNAME = "bob";
     private static final String PASSWORD = "123";
     private static final String DUMMY_CONTENT = "dummy content";
-    private static final String LOCAL_DUMMY_FILE = "target/dummy.txt";
+    private static final Path LOCAL_DUMMY_FILE = Paths.get("target/dummy.txt");
     private static final String REMOTE_DUMMY_FILE = "/dummy_directory/dummy_file.txt";
     private static final JSch JSCH = new JSch();
     private static final int TIMEOUT = 2000;
@@ -52,49 +52,53 @@
     public final FakeSftpServerRule sftpServer = new FakeSftpServerRule().addUser(USERNAME, PASSWORD);
 
     @Test
-    public void collectFile_withOKresponse() throws IOException, JSchException, SftpException {
-        SftpClient sftpClient = new SftpClient();
-        sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
-        byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
+    public void collectFile_withOKresponse() throws DatafileTaskException, IOException, JSchException, SftpException, Exception {
         FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
                 .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
-        sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE,
-                LOCAL_DUMMY_FILE);
-        byte[] localFile = Files.readAllBytes(new File(LOCAL_DUMMY_FILE).toPath());
+        SftpClient sftpClient = new SftpClient(expectedFileServerData);
+        sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
+        byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
+
+        sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
+        byte[] localFile = Files.readAllBytes(LOCAL_DUMMY_FILE.toFile().toPath());
         assertThat(new String(file, UTF_8)).isEqualTo(DUMMY_CONTENT);
         assertThat(new String(localFile, UTF_8)).isEqualTo(DUMMY_CONTENT);
     }
 
     @Test
     public void collectFile_withWrongUserName_shouldFail() throws IOException, JSchException, SftpException {
-        SftpClient sftpClient = new SftpClient();
-        sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
-        byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
         FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
                 .userId("Wrong").password(PASSWORD).port(sftpServer.getPort()).build();
-        FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, REMOTE_DUMMY_FILE,
-                LOCAL_DUMMY_FILE);
+        SftpClient sftpClient = new SftpClient(expectedFileServerData);
+        sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
 
-        assertFalse(actualResult.downloadSuccessful());
-        String expectedErrorMessage = "Unable to set up SFTP connection to xNF. Data: "
-                + "FileServerData{serverAddress=127.0.0.1, userId=Wrong, password=123, port=";
-        assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage));
+        String errorMessage = "";
+        try {
+            sftpClient.collectFile(REMOTE_DUMMY_FILE, LOCAL_DUMMY_FILE);
+        } catch (Exception e) {
+            errorMessage = e.getMessage();
+        }
+
+        assertTrue(errorMessage.contains("Auth fail"));
     }
 
     @Test
     public void collectFile_withWrongFileName_shouldFail() throws IOException, JSchException, SftpException {
-        SftpClient sftpClient = new SftpClient();
-        sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
-        byte[] file = downloadFile(sftpServer, REMOTE_DUMMY_FILE);
         FileServerData expectedFileServerData = ImmutableFileServerData.builder().serverAddress("127.0.0.1")
                 .userId(USERNAME).password(PASSWORD).port(sftpServer.getPort()).build();
-        FileCollectResult actualResult = sftpClient.collectFile(expectedFileServerData, "wrong",
-                LOCAL_DUMMY_FILE);
+        SftpClient sftpClient = new SftpClient(expectedFileServerData);
+        sftpServer.putFile(REMOTE_DUMMY_FILE, DUMMY_CONTENT, UTF_8);
 
-        assertFalse(actualResult.downloadSuccessful());
+        String errorMessage = "";
+        try {
+            sftpClient.collectFile("wrong", LOCAL_DUMMY_FILE);
+        } catch (Exception e) {
+            errorMessage = e.getMessage();
+        }
+
         String expectedErrorMessage = "Unable to get file from xNF. Data: FileServerData{serverAddress=127.0.0.1, "
                 + "userId=bob, password=123, port=";
-        assertTrue(actualResult.getErrorData().toString().startsWith(expectedErrorMessage));
+        assertTrue(errorMessage.startsWith(expectedErrorMessage));
     }
 
     private static Session connectToServer(FakeSftpServerRule sftpServer) throws JSchException {
@@ -133,5 +137,4 @@
             session.disconnect();
         }
     }
-
 }
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
index 128f78f..54db740 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapReactiveWebClientTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
index 4a9f9c1..c973a12 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/HttpUtilsTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -16,9 +16,9 @@
 
 package org.onap.dcaegen2.collectors.datafile.service;
 
-import static org.junit.jupiter.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 class HttpUtilsTest {
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index beac4ee..a0d3673 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
  * ===============================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -29,6 +29,7 @@
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -41,7 +42,6 @@
 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
@@ -49,6 +49,7 @@
 import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.test.StepVerifier;
@@ -122,17 +123,17 @@
     void getHttpResponse_Success() throws Exception {
         mockWebClientDependantObject(true);
 
-        URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
-                .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
-
         HttpPut httpPut = new HttpPut();
         httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
 
-        JsonElement metaData = new JsonParser().parse(new CommonFunctions().createJsonBody(consumerDmaapModel));
+        URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
+                .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
+        httpPut.setURI(expectedUri);
+
+        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
         metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
         httpPut.addHeader(X_DMAAP_DR_META, metaData.toString());
-        httpPut.setURI(expectedUri);
 
         String plainCreds = "dradmin" + ":" + "dradmin";
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
@@ -142,9 +143,9 @@
 
         fileStream.reset();
         StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
-        .expectNext(responseMock.toString()).verifyComplete();
+        .expectNext(HttpStatus.OK).verifyComplete();
 
-        verify(fileSystemResourceMock).setPath("target/" + FILE_NAME);
+        verify(fileSystemResourceMock).setPath(Paths.get("target/" + FILE_NAME));
         InputStream fileInputStream = fileSystemResourceMock.getInputStream();
         httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
     }
@@ -153,7 +154,8 @@
     void getHttpResponse_Fail() throws Exception {
         mockWebClientDependantObject(false);
         StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
-                .verifyComplete();
+        .expectError()
+        .verify();
     }
 
     private void mockWebClientDependantObject(boolean success)
diff --git a/pom.xml b/pom.xml
index 5bfab1c..c103c5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
   ~ ============LICENSE_START=====================================================================
-  ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
+  ~ Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
   ~ ==============================================================================================
   ~ Licensed under the Apache License, Version 2.0 (the "License");
   ~ you may not use this file except in compliance with the License.
@@ -167,14 +167,7 @@
         <groupId>org.apache.httpcomponents</groupId>
         <artifactId>httpasyncclient</artifactId>
         <version>4.1.4</version>
-      </dependency>
-      <dependency>
-        <groupId>io.projectreactor</groupId>
-        <artifactId>reactor-bom</artifactId>
-        <version>Bismuth-SR10</version>
-        <type>pom</type>
-        <scope>import</scope>
-      </dependency>
+      </dependency>      
       <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-dependencies</artifactId>