Merge "Add missing INFO.yaml"
diff --git a/datafile-app-server/config/application.yaml b/datafile-app-server/config/application.yaml
index cef185c..b66f7b6 100644
--- a/datafile-app-server/config/application.yaml
+++ b/datafile-app-server/config/application.yaml
@@ -14,6 +14,7 @@
     ROOT: ERROR
     org.springframework: ERROR
     org.springframework.data: ERROR
+    org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
     org.onap.dcaegen2.collectors.datafile: ERROR
   file: opt/log/application.log
 app:
diff --git a/datafile-app-server/pom.xml b/datafile-app-server/pom.xml
index 51d9cec..aaa9c1a 100644
--- a/datafile-app-server/pom.xml
+++ b/datafile-app-server/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.1.0-SNAPSHOT</version>
+    <version>1.1.1-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
index e828776..46c6e94 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParser.java
@@ -26,8 +26,6 @@
 import java.util.Optional;
 import java.util.stream.StreamSupport;
 
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FileMetaData;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData;
@@ -92,35 +90,38 @@
      * @return reactive Mono with an array of FileData
      */
     public Flux<FileData> getJsonObject(Mono<String> rawMessage) {
-        return rawMessage.flatMap(this::getJsonParserMessage).flatMapMany(this::createJsonConsumerModel);
+        return rawMessage.flatMapMany(this::getJsonParserMessage).flatMap(this::createJsonConsumerModel);
     }
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
         logger.trace("original message from message router: {}", message);
-        return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
-                : Mono.fromSupplier(() -> new JsonParser().parse(message));
+        return StringUtils.isEmpty(message) ? Mono.empty() : Mono.fromSupplier(() -> new JsonParser().parse(message));
     }
 
     private Flux<FileData> createJsonConsumerModel(JsonElement jsonElement) {
-        return jsonElement.isJsonObject() ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+        return jsonElement.isJsonObject() ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
                 : getFileDataFromJsonArray(jsonElement);
     }
 
     private Flux<FileData> getFileDataFromJsonArray(JsonElement jsonElement) {
-        return create(Mono.fromCallable(() -> StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
-                .findFirst().flatMap(this::getJsonObjectFromAnArray).orElseThrow(DmaapEmptyResponseException::new)));
+        return create(
+                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
+                        .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+                                .orElseGet(JsonObject::new)))));
     }
 
     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
-        logger.trace("starting to getJsonObjectFromAnArray!");
-
-        return Optional.of(new JsonParser().parse(element.getAsString()).getAsJsonObject());
+        JsonParser jsonParser = new JsonParser();
+        return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
+                : element.isJsonObject() ? Optional.of((JsonObject) element)
+                        : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
     }
 
-    private Flux<FileData> create(Mono<JsonObject> jsonObject) {
-        return jsonObject.flatMapMany(monoJsonP -> !containsNotificationFields(monoJsonP)
-                ? Flux.error(new DmaapNotFoundException("Incorrect JsonObject - missing header. " + jsonObject))
-                : transform(monoJsonP));
+    private Flux<FileData> create(Flux<JsonObject> jsonObject) {
+        return jsonObject
+                .flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
+                        ? logErrorAndReturnEmptyFlux("Incorrect JsonObject - missing header. " + jsonObject)
+                        : transform(monoJsonP));
     }
 
     private Flux<FileData> transform(JsonObject message) {
@@ -132,11 +133,11 @@
                 return getAllFileDataFromJson(fileMetaData.get(), arrayOfNamedHashMap);
             }
 
-            return Flux.error(new DmaapNotFoundException(
-                    "Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. " + message));
+            logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
+            return Flux.empty();
         }
-        return Flux.error(new DmaapNotFoundException(
-                "Unable to collect file from xNF. FileReady event has incorrect JsonObject"));
+        logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
+        return Flux.empty();
     }
 
     private Optional<FileMetaData> getFileMetaData(JsonObject message) {
@@ -191,13 +192,11 @@
     private Flux<FileData> getAllFileDataFromJson(FileMetaData fileMetaData, JsonArray arrayOfAdditionalFields) {
         List<FileData> res = new ArrayList<>();
         for (int i = 0; i < arrayOfAdditionalFields.size(); i++) {
-            if (arrayOfAdditionalFields.get(i) != null) {
-                JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
-                Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
+            JsonObject fileInfo = (JsonObject) arrayOfAdditionalFields.get(i);
+            Optional<FileData> fileData = getFileDataFromJson(fileMetaData, fileInfo);
 
-                if (fileData.isPresent()) {
-                    res.add(fileData.get());
-                }
+            if (fileData.isPresent()) {
+                res.add(fileData.get());
             }
         }
         return Flux.fromIterable(res);
@@ -260,4 +259,9 @@
     private boolean containsNotificationFields(JsonObject jsonObject) {
         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(NOTIFICATION_FIELDS);
     }
+
+    private Flux<FileData> logErrorAndReturnEmptyFlux(String errorMessage) {
+        logger.error(errorMessage);
+        return Flux.empty();
+    }
 }
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index 171dd02..c465fe9 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -25,6 +25,7 @@
 import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -59,11 +60,18 @@
      */
     public void scheduleMainDatafileEventTask() {
         logger.trace("Execution of tasks was registered");
-
+        //@formatter:off
         consumeFromDmaapMessage()
+                .publishOn(Schedulers.parallel())
+                .cache()
                 .doOnError(DmaapEmptyResponseException.class, error -> logger.info("Nothing to consume from DMaaP"))
-                .flatMap(this::collectFilesFromXnf).flatMap(this::publishToDmaapConfiguration)
+                .flatMap(this::collectFilesFromXnf)
+                .retry(3)
+                .cache()
+                .flatMap(this::publishToDmaapConfiguration)
+                .retry(3)
                 .subscribe(this::onSuccess, this::onError, this::onComplete);
+        //@formatter:on
     }
 
     private void onComplete() {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
index a9bc546..0ae9ece 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/DmaapConsumerJsonParserTest.java
@@ -60,7 +60,7 @@
     private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
 
     @Test
-    void whenPassingCorrectJson_validationNotThrowingAnException() throws DmaapNotFoundException {
+    void whenPassingCorrectJson_oneFileData() throws DmaapNotFoundException {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -108,7 +108,98 @@
     }
 
     @Test
-    void whenPassingCorrectJsonWithFaultyEventName_validationThrowingAnException() {
+    void whenPassingCorrectJsonWithTwoEvents_twoFileData() throws DmaapNotFoundException {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+
+        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+                .productName(PRODUCT_NAME)
+                .vendorName(VENDOR_NAME)
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+                .sourceName(SOURCE_NAME)
+                .startEpochMicrosec(START_EPOCH_MICROSEC)
+                .timeZoneOffset(TIME_ZONE_OFFSET)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .build();
+        FileData expectedFileData = ImmutableFileData.builder()
+                .fileMetaData(fileMetaData)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
+        String parsedString = message.getParsed();
+        String messageString = "[" + parsedString + "," + parsedString + "]";
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).expectNext(expectedFileData).verifyComplete();
+    }
+
+    @Test
+    void whenPassingCorrectJsonWithTwoEventsFirstNoHeader_oneFileDatan()
+            throws DmaapNotFoundException {
+        // @formatter:off
+        AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .addAdditionalField(additionalField)
+                .build();
+
+        FileMetaData fileMetaData = ImmutableFileMetaData.builder()
+                .productName(PRODUCT_NAME)
+                .vendorName(VENDOR_NAME)
+                .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
+                .sourceName(SOURCE_NAME)
+                .startEpochMicrosec(START_EPOCH_MICROSEC)
+                .timeZoneOffset(TIME_ZONE_OFFSET)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .build();
+        FileData expectedFileData = ImmutableFileData.builder()
+                .fileMetaData(fileMetaData)
+                .name(PM_FILE_NAME)
+                .location(LOCATION)
+                .compression(GZIP_COMPRESSION)
+                .fileFormatType(FILE_FORMAT_TYPE)
+                .fileFormatVersion(FILE_FORMAT_VERSION)
+                .build();
+        // @formatter:on
+        String parsedString = message.getParsed();
+        String messageString = "[{\"event\":{}}," + parsedString + "]";
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+                .expectNext(expectedFileData).verifyComplete();
+    }
+
+    @Test
+    void whenPassingCorrectJsonWithFaultyEventName_noFileData() {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .location(LOCATION)
@@ -132,7 +223,7 @@
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectError(DmaapNotFoundException.class).verify();
+                .expectComplete().verify();
     }
 
     @Test
@@ -164,6 +255,27 @@
     }
 
     @Test
+    void whenPassingCorrectJsonWithoutAdditionalFields_noFileData() {
+        // @formatter:off
+        JsonMessage message = new JsonMessage.JsonMessageBuilder()
+                .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
+                .changeIdentifier(CHANGE_IDENTIFIER)
+                .changeType(CHANGE_TYPE)
+                .notificationFieldsVersion(NOTIFICATION_FIELDS_VERSION)
+                .build();
+        // @formatter:on
+        String messageString = message.toString();
+        String parsedString = message.getParsed();
+        DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
+        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
+        .getJsonObjectFromAnArray(jsonElement);
+
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
+        .expectNextCount(0).verifyComplete();
+    }
+
+    @Test
     void whenPassingCorrectJsonWithoutLocation_noFileData() {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
@@ -303,7 +415,7 @@
     }
 
     @Test
-    void whenPassingJsonWithoutMandatoryHeaderInformation_validationThrowingAnException() {
+    void whenPassingJsonWithoutMandatoryHeaderInformation_noFileData() {
         // @formatter:off
         JsonMessage message = new JsonMessage.JsonMessageBuilder()
                 .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
@@ -320,27 +432,23 @@
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
-                .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+                .expectSubscription().expectComplete().verify();
     }
 
     @Test
-    void whenPassingJsonWithNullJsonElement_validationThrowingAnException() {
-        JsonMessage message = new JsonMessage.JsonMessageBuilder().build();
-
-        String incorrectMessageString = message.toString();
-        String parsedString = message.getParsed();
+    void whenPassingJsonWithNullJsonElement_noFileData() {
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
-        JsonElement jsonElement = new JsonParser().parse(parsedString);
+        JsonElement jsonElement = new JsonParser().parse("{}");
 
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject())).when(dmaapConsumerJsonParser)
                 .getJsonObjectFromAnArray(jsonElement);
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageString)))
-                .expectSubscription().expectError(DmaapNotFoundException.class).verify();
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just("[{}]"))).expectSubscription()
+                .expectComplete().verify();
     }
 
     @Test
-    void whenPassingCorrectJsonWithIncorrectChangeType_validationThrowingAnException() {
+    void whenPassingCorrectJsonWithIncorrectChangeType_noFileData() {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -364,11 +472,11 @@
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+                .expectNextCount(0).expectComplete().verify();
     }
 
     @Test
-    void whenPassingCorrectJsonWithIncorrectChangeIdentifier_validationThrowingAnException() {
+    void whenPassingCorrectJsonWithIncorrectChangeIdentifier_noFileData() {
         // @formatter:off
         AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder()
                 .name(PM_FILE_NAME)
@@ -392,6 +500,6 @@
                 .getJsonObjectFromAnArray(jsonElement);
 
         StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageString))).expectSubscription()
-                .expectNextCount(0).expectError(DmaapNotFoundException.class).verify();
+                .expectComplete().verify();
     }
 }
diff --git a/datafile-commons/pom.xml b/datafile-commons/pom.xml
index 393d0a6..4ef2c68 100644
--- a/datafile-commons/pom.xml
+++ b/datafile-commons/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.1.0-SNAPSHOT</version>
+    <version>1.1.1-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
diff --git a/datafile-dmaap-client/pom.xml b/datafile-dmaap-client/pom.xml
index 9f1d21d..6d813c8 100644
--- a/datafile-dmaap-client/pom.xml
+++ b/datafile-dmaap-client/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.collectors</groupId>
     <artifactId>datafile</artifactId>
-    <version>1.1.0-SNAPSHOT</version>
+    <version>1.1.1-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
@@ -37,9 +37,16 @@
   </properties>
 
   <dependencies>
-
     <!-- DEVELOPMENT DEPENDENCIES -->
     <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpasyncclient</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.onap.dcaegen2.collectors.datafile</groupId>
       <artifactId>datafile-commons</artifactId>
       <version>${project.parent.version}</version>
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
index 2ccf1ba..4b7cc01 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FTPSClientWrapper.java
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -75,6 +73,11 @@
     }
 
     @Override
+    public void setFileType(int fileType) throws IOException {
+        ftpsClient.setFileType(fileType);
+    }
+
+    @Override
     public void execPBSZ(int psbz) throws IOException {
         ftpsClient.execPBSZ(psbz);
     }
@@ -93,4 +96,14 @@
     public void setTimeout(Integer t) {
         this.ftpsClient.setDefaultTimeout(t);
     }
+
+    @Override
+    public boolean isConnected() {
+        return ftpsClient.isConnected();
+    }
+
+    @Override
+    public void setBufferSize(int bufSize) {
+        ftpsClient.setBufferSize(bufSize);
+    }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
index 9b6eacb..fa1d431 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResult.java
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -44,6 +42,7 @@
 
     @Override
     public String toString() {
-        return "FileCollectResult: " + result + " Error data: " + getErrorData();
+        return "FileCollectResult: "
+                + (downloadSuccessful() ? "successful!" : "unsuccessful! Error data: " + getErrorData());
     }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
index 8247bcc..0d055fc 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClient.java
@@ -22,6 +22,7 @@
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 
+import org.apache.commons.net.ftp.FTP;
 import org.apache.commons.net.ftp.FTPReply;
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.FileWrapper;
@@ -72,7 +73,6 @@
 
         if (setUpKeyManager(ftps) && setUpTrustedCA(ftps) && setUpConnection(ftps)) {
             if (getFileFromxNF(ftps)) {
-                closeDownConnection(ftps);
                 fileCollectResult = new FileCollectResult();
             } else {
                 fileCollectResult = new FileCollectResult(errorData);
@@ -80,6 +80,7 @@
         } else {
             fileCollectResult = new FileCollectResult(errorData);
         }
+        closeDownConnection(ftps);
         logger.trace("retryCollectFile left with result: {}", fileCollectResult);
         return fileCollectResult;
     }
@@ -87,6 +88,7 @@
     private boolean setUpKeyManager(IFTPSClient ftps) {
         boolean result = true;
         if (keyManagerSet) {
+            logger.trace("keyManager already set!");
             return result;
         }
         try {
@@ -105,6 +107,7 @@
     private boolean setUpTrustedCA(IFTPSClient ftps) {
         boolean result = true;
         if (trustManagerSet) {
+            logger.trace("trustManager already set!");
             return result;
         }
         try {
@@ -130,32 +133,42 @@
     private boolean setUpConnection(IFTPSClient ftps) {
         boolean result = true;
         try {
+            if (ftps.isConnected()) {
+                addError(
+                        "Looks like previous ftp connection is still in use, will retry in 1 minute. " + fileServerData,
+                        null);
+                return false;
+            }
             ftps.connect(fileServerData.serverAddress(), fileServerData.port());
             logger.trace("after ftp connect");
             boolean loginSuccesful = ftps.login(fileServerData.userId(), fileServerData.password());
             if (!loginSuccesful) {
-                ftps.logout();
+                closeDownConnection(ftps);
                 addError("Unable to log in to xNF. " + fileServerData, null);
-                result = false;
+                return false;
             }
 
             if (loginSuccesful && FTPReply.isPositiveCompletion(ftps.getReplyCode())) {
                 ftps.enterLocalPassiveMode();
+                ftps.setFileType(FTP.BINARY_FILE_TYPE);
                 // Set protection buffer size
                 ftps.execPBSZ(0);
                 // Set data channel protection to private
                 ftps.execPROT("P");
+                ftps.setBufferSize(1024 * 1024);
             } else {
-                ftps.disconnect();
+                closeDownConnection(ftps);
                 addError("Unable to connect to xNF. " + fileServerData + " xNF reply code: " + ftps.getReplyCode(),
                         null);
-                result = false;
+                return false;
             }
-        } catch (Exception ex) {
-            addError("Unable to connect to xNF. Data: " + fileServerData, ex);
-            result = false;
+        } catch (Exception e) {
+            logger.trace("connect to ftp server failed.", e);
+            addError("Unable to connect to xNF. Data: " + fileServerData, e);
+            closeDownConnection(ftps);
+            return false;
         }
-        logger.trace("setUpConnection return value: {}", result);
+        logger.trace("setUpConnection successfully!");
         return result;
     }
 
@@ -169,8 +182,9 @@
 
             IOutputStream outputStream = getOutputStream();
             OutputStream output = outputStream.getOutputStream(outfile.getFile());
-
+            logger.trace("begin to retrieve from xNF.");
             result = ftps.retrieveFile(remoteFile, output);
+            logger.trace("end retrieve from xNF.");
             if (!result) {
                 output.close();
                 logger.debug("Unable to retrieve file from xNF. Cause unknown!");
@@ -186,20 +200,26 @@
             } catch (Exception e) {
                 logger.trace("Unable to delete file {}.", localFile, e);
             }
-            result = false;
+            return false;
         }
         return result;
     }
 
     private void closeDownConnection(IFTPSClient ftps) {
         logger.trace("starting to closeDownConnection");
-        try {
-            if (ftps != null) {
-                ftps.logout();
-                ftps.disconnect();
+        if (ftps != null && ftps.isConnected()) {
+            try {
+                boolean logOut = ftps.logout();
+                logger.trace("logOut: {}", logOut);
+            } catch (Exception e) {
+                logger.trace("Unable to logout connection.", e);
             }
-        } catch (Exception e) {
-            logger.trace("Unable to logout and close connection.", e);
+            try {
+                ftps.disconnect();
+                logger.trace("disconnected!");
+            } catch (Exception e) {
+                logger.trace("Unable to disconnect connection.", e);
+            }
         }
     }
 
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
index b147202..1a58163 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/ftp/IFTPSClient.java
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -39,10 +37,16 @@
 
     public int getReplyCode();
 
+    public void setBufferSize(int bufSize);
+
+    public boolean isConnected();
+
     public void disconnect() throws IOException;
 
     public void enterLocalPassiveMode();
 
+    public void setFileType(int fileType) throws IOException;
+
     public void execPBSZ(int newParam) throws IOException;
 
     public void execPROT(String prot) throws IOException;
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
index 380eac8..0e95b0b 100644
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
+++ b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClient.java
@@ -23,26 +23,29 @@
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.Future;
+
+import javax.net.ssl.SSLContext;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.ssl.SSLContextBuilder;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.collectors.datafile.io.FileSystemResourceWrapper;
 import org.onap.dcaegen2.collectors.datafile.io.IFileSystemResource;
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate;
-import org.onap.dcaegen2.collectors.datafile.web.RestTemplateWrapper;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
+import org.onap.dcaegen2.collectors.datafile.web.PublishRedirectStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.core.publisher.Flux;
@@ -70,7 +73,7 @@
     private final String pwd;
 
     private IFileSystemResource fileResource;
-    private IRestTemplate restTemplate;
+    private CloseableHttpAsyncClient webClient;
 
     /**
      * Constructor DmaapProducerReactiveHttpClient.
@@ -78,7 +81,6 @@
      * @param dmaapPublisherConfiguration - DMaaP producer configuration object
      */
     public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
-
         this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
@@ -97,54 +99,70 @@
     public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
         logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
         try {
-            HttpHeaders headers = new HttpHeaders();
-            headers.setContentType(MediaType.parseMediaType(dmaapContentType));
-            addMetaDataToHead(consumerDmaapModel, headers);
-
-            addUserCredentialsToHead(headers);
-
-            IFileSystemResource fileSystemResource = getFileSystemResource();
-            fileSystemResource.setPath(consumerDmaapModel.getInternalLocation());
-            InputStream fileInputStream = fileSystemResource.getInputStream();
-            HttpEntity<byte[]> request = addFileToRequest(fileInputStream, headers);
-
-
             logger.trace("Starting to publish to DR");
-            ResponseEntity<String> responseEntity = getRestTemplate().exchange(getUri(consumerDmaapModel.getName()),
-                    HttpMethod.PUT, request, String.class);
 
-            return Flux.just(responseEntity.getStatusCode().toString());
+            webClient = getWebClient();
+            webClient.start();
+
+            HttpPut put = new HttpPut();
+            prepareHead(consumerDmaapModel, put);
+            prepareBody(consumerDmaapModel, put);
+            addUserCredentialsToHead(put);
+
+            Future<HttpResponse> future = webClient.execute(put, null);
+            HttpResponse response = future.get();
+            logger.trace(response.toString());
+            webClient.close();
+            handleHttpResponse(response);
+            return Flux.just(response.toString());
         } catch (Exception e) {
             logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel, e);
             return Flux.empty();
         }
     }
 
-    private void addUserCredentialsToHead(HttpHeaders headers) {
+    private void handleHttpResponse(HttpResponse response) {
+        int statusCode = response.getStatusLine().getStatusCode();
+        if (HttpUtils.isSuccessfulResponseCode(statusCode)) {
+            logger.trace("Publish to DR successful!");
+        } else {
+            logger.error("Publish to DR unsuccessful, response code: " + statusCode);
+        }
+    }
+
+    private void addUserCredentialsToHead(HttpPut put) {
         String plainCreds = user + ":" + pwd;
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
         byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
         String base64Creds = new String(base64CredsBytes);
         logger.trace("base64Creds...: {}", base64Creds);
-        headers.add("Authorization", "Basic " + base64Creds);
+        put.addHeader("Authorization", "Basic " + base64Creds);
     }
 
-    private void addMetaDataToHead(ConsumerDmaapModel consumerDmaapModel, HttpHeaders headers) {
-        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
-        metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+    private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+        put.addHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+        JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+        String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
-        headers.set(X_ATT_DR_META, metaData.toString());
-    }
-    private HttpEntity<byte[]> addFileToRequest(InputStream inputStream, HttpHeaders headers)
-            throws IOException {
-        return new HttpEntity<>(IOUtils.toByteArray(inputStream), headers);
+        put.addHeader(X_ATT_DR_META, metaData.toString());
+        put.setURI(getUri(name));
     }
 
-    private IRestTemplate getRestTemplate() throws NoSuchAlgorithmException, KeyManagementException, KeyStoreException {
-        if (restTemplate == null) {
-            restTemplate = new RestTemplateWrapper();
+    private void prepareBody(ConsumerDmaapModel model, HttpPut put) {
+        String fileLocation = model.getInternalLocation();
+        IFileSystemResource fileSystemResource = getFileSystemResource();
+        fileSystemResource.setPath(fileLocation);
+        InputStream fileInputStream = null;
+        try {
+            fileInputStream = fileSystemResource.getInputStream();
+        } catch (IOException e) {
+            logger.error("Unable to get stream from filesystem.", e);
         }
-        return restTemplate;
+        try {
+            put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+        } catch (IOException e) {
+            logger.error("Unable to set put request body from ByteArray.", e);
+        }
     }
 
     private URI getUri(String fileName) {
@@ -164,7 +182,26 @@
         fileResource = fileSystemResource;
     }
 
-    protected void setRestTemplate(IRestTemplate restTemplate) {
-        this.restTemplate = restTemplate;
+    protected CloseableHttpAsyncClient getWebClient() {
+        if (webClient != null) {
+            return webClient;
+        }
+        SSLContext sslContext = null;
+        try {
+            sslContext = new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
+        } catch (Exception e) {
+            logger.trace("Unable to get sslContext.", e);
+        }
+        //@formatter:off
+        return HttpAsyncClients.custom()
+                .setSSLContext(sslContext)
+                .setSSLHostnameVerifier(new NoopHostnameVerifier())
+                .setRedirectStrategy(PublishRedirectStrategy.INSTANCE)
+                .build();
+        //@formatter:on
+    }
+
+    protected void setWebClient(CloseableHttpAsyncClient client) {
+        this.webClient = client;
     }
 }
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/IRestTemplate.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/IRestTemplate.java
deleted file mode 100644
index 07e7563..0000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/IRestTemplate.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-import java.net.URI;
-
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
-
-@FunctionalInterface
-public interface IRestTemplate {
-    public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity,
-            Class<String> responseType);
-}
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java
deleted file mode 100644
index 15d459f..0000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-package org.onap.dcaegen2.collectors.datafile.web;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpRequest;
-import org.springframework.http.client.ClientHttpRequestExecution;
-import org.springframework.http.client.ClientHttpRequestInterceptor;
-import org.springframework.http.client.ClientHttpResponse;
-
-public class RequestResponseLoggingInterceptor implements ClientHttpRequestInterceptor {
-
-    private final Logger log = LoggerFactory.getLogger(this.getClass());
-
-    @Override
-    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
-        logRequest(request, body);
-        ClientHttpResponse response = execution.execute(request, body);
-        logResponse(response);
-        return response;
-    }
-
-    private void logRequest(HttpRequest request, byte[] body) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("===========================request begin================================================");
-            log.debug("URI         : {}", request.getURI());
-            log.debug("Method      : {}", request.getMethod());
-            log.debug("Headers     : {}", request.getHeaders());
-            log.debug("Request body: {}", new String(body, "UTF-8"));
-            log.debug("==========================request end================================================");
-        }
-    }
-
-    private void logResponse(ClientHttpResponse response) throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug("============================response begin==========================================");
-            log.debug("Status code  : {}", response.getStatusCode());
-            log.debug("Status text  : {}", response.getStatusText());
-            log.debug("Headers      : {}", response.getHeaders());
-            log.debug("=======================response end=================================================");
-        }
-    }
-}
\ No newline at end of file
diff --git a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java b/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java
deleted file mode 100644
index 99ead84..0000000
--- a/datafile-dmaap-client/src/main/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-import java.net.URI;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.util.Collections;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.ssl.SSLContextBuilder;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.ResponseEntity;
-import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
-import org.springframework.web.client.RestTemplate;
-
-public class RestTemplateWrapper implements IRestTemplate {
-    private RestTemplate restTemplate;
-
-    public RestTemplateWrapper() throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
-        SSLContext sslContext =
-                new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
-        CloseableHttpClient httpClient =
-                HttpClients.custom().setSSLContext(sslContext).setSSLHostnameVerifier(new NoopHostnameVerifier())
-                        .setRedirectStrategy(new PublishRedirectStrategy()).build();
-
-        HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
-        requestFactory.setHttpClient(httpClient);
-
-        restTemplate = new RestTemplate(requestFactory);
-        restTemplate.setInterceptors(Collections.singletonList(new RequestResponseLoggingInterceptor()));
-
-    }
-
-    @Override
-    public ResponseEntity<String> exchange(URI url, HttpMethod method, HttpEntity<byte[]> requestEntity,
-            Class<String> responseType) {
-        return restTemplate.exchange(url,  method, requestEntity, responseType);
-    }
-
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
index 083727e..38d2423 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FileCollectResultTest.java
@@ -2,17 +2,15 @@
  * ============LICENSE_START======================================================================
  * Copyright (C) 2018 Nordix Foundation. All rights reserved.
  * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
  * ============LICENSE_END========================================================================
  */
 
@@ -30,7 +28,7 @@
     public void successfulResult() {
         FileCollectResult resultUnderTest = new FileCollectResult();
         assertTrue(resultUnderTest.downloadSuccessful());
-        assertEquals("FileCollectResult: true Error data: ", resultUnderTest.toString());
+        assertEquals("FileCollectResult: successful!", resultUnderTest.toString());
     }
 
     @Test
@@ -40,6 +38,7 @@
         errorData.addError("Null", new NullPointerException());
         FileCollectResult resultUnderTest = new FileCollectResult(errorData);
         assertFalse(resultUnderTest.downloadSuccessful());
-        assertEquals("FileCollectResult: false Error data: " + errorData.toString(), resultUnderTest.toString());
+        assertEquals("FileCollectResult: unsuccessful! Error data: " + errorData.toString(),
+                resultUnderTest.toString());
     }
 }
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
index 2157e17..c134b79 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java
@@ -36,6 +36,8 @@
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.TrustManager;
 
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPReply;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.io.IFile;
@@ -88,7 +90,7 @@
         clientUnderTest.setKeyCertPassword(FTP_KEY_PASSWORD);
         clientUnderTest.setTrustedCAPath(TRUSTED_CA_PATH);
         clientUnderTest.setTrustedCAPassword(TRUSTED_CA_PASSWORD);
-}
+    }
 
     @Test
     public void collectFile_allOk() throws Exception {
@@ -103,6 +105,7 @@
         OutputStream osMock = mock(OutputStream.class);
         when(outputStreamMock.getOutputStream(fileMock)).thenReturn(osMock);
         when(ftpsClientMock.retrieveFile(REMOTE_FILE_PATH, osMock)).thenReturn(true);
+        when(ftpsClientMock.isConnected()).thenReturn(false, true);
 
         ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
                 .userId(USERNAME).password(PASSWORD).port(PORT).build();
@@ -124,19 +127,22 @@
         verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
         verify(ftpsClientMock).execPBSZ(0);
         verify(ftpsClientMock).execPROT("P");
+        verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+        verify(ftpsClientMock).setBufferSize(1024 * 1024);
         verify(localFileMock).setPath(LOCAL_FILE_PATH);
         verify(localFileMock, times(1)).createNewFile();
         verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
         verify(osMock, times(1)).close();
         verify(ftpsClientMock, times(1)).logout();
         verify(ftpsClientMock, times(1)).disconnect();
+        verify(ftpsClientMock, times(2)).isConnected();
         verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
     public void collectFileFaultyOwnKey_shouldFail() throws Exception {
-        doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException()))
-                .when(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        doThrow(new IKeyManagerUtils.KeyManagerException(new GeneralSecurityException())).when(keyManagerUtilsMock)
+                .setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
 
         ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
                 .userId(USERNAME).password(PASSWORD).port(PORT).build();
@@ -144,6 +150,10 @@
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock, times(1)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -160,6 +170,15 @@
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock, times(1)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -175,8 +194,19 @@
 
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
-        verify(ftpsClientMock, times(1)).logout();
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock, times(3)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -186,15 +216,27 @@
         when(keyStoreWrapperMock.getKeyStore()).thenReturn(keyStoreMock);
         when(trustManagerFactoryMock.getTrustManagers()).thenReturn(new TrustManager[] {trustManagerMock});
         when(ftpsClientMock.login(USERNAME, PASSWORD)).thenReturn(true);
-        when(ftpsClientMock.getReplyCode()).thenReturn(HttpStatus.BAD_REQUEST.value());
+        when(ftpsClientMock.getReplyCode()).thenReturn(FTPReply.BAD_COMMAND_SEQUENCE);
 
         ImmutableFileServerData fileServerData = ImmutableFileServerData.builder().serverAddress(XNF_ADDRESS)
                 .userId(USERNAME).password(PASSWORD).port(PORT).build();
 
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
-        verify(ftpsClientMock, times(1)).disconnect();
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock, times(2)).getReplyCode();
+        verify(ftpsClientMock, times(3)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -212,6 +254,17 @@
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock, times(3)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -232,6 +285,26 @@
 
         assertFalse(result.downloadSuccessful());
         verify(localFileMock, times(1)).delete();
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock).getReplyCode();
+        verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+        verify(ftpsClientMock).execPBSZ(0);
+        verify(ftpsClientMock).execPROT("P");
+        verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+        verify(ftpsClientMock).setBufferSize(1024 * 1024);
+        verify(localFileMock).setPath(LOCAL_FILE_PATH);
+        verify(localFileMock, times(1)).createNewFile();
+        verify(ftpsClientMock, times(2)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
 
     @Test
@@ -254,5 +327,26 @@
         FileCollectResult result = clientUnderTest.collectFile(fileServerData, REMOTE_FILE_PATH, LOCAL_FILE_PATH);
 
         assertFalse(result.downloadSuccessful());
+        verify(ftpsClientMock).setNeedClientAuth(true);
+        verify(keyManagerUtilsMock).setCredentials(FTP_KEY_PATH, FTP_KEY_PASSWORD);
+        verify(ftpsClientMock).setKeyManager(keyManagerMock);
+        verify(fileResourceMock).setPath(TRUSTED_CA_PATH);
+        verify(keyStoreWrapperMock).load(inputStreamMock, TRUSTED_CA_PASSWORD.toCharArray());
+        verify(inputStreamMock, times(1)).close();
+        verify(trustManagerFactoryMock).init(keyStoreMock);
+        verify(ftpsClientMock).setTrustManager(trustManagerMock);
+        verify(ftpsClientMock).connect(XNF_ADDRESS, PORT);
+        verify(ftpsClientMock).login(USERNAME, PASSWORD);
+        verify(ftpsClientMock).getReplyCode();
+        verify(ftpsClientMock, times(1)).enterLocalPassiveMode();
+        verify(ftpsClientMock).execPBSZ(0);
+        verify(ftpsClientMock).execPROT("P");
+        verify(ftpsClientMock).setFileType(FTP.BINARY_FILE_TYPE);
+        verify(ftpsClientMock).setBufferSize(1024 * 1024);
+        verify(localFileMock).setPath(LOCAL_FILE_PATH);
+        verify(localFileMock, times(1)).createNewFile();
+        verify(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, osMock);
+        verify(ftpsClientMock, times(2)).isConnected();
+        verifyNoMoreInteractions(ftpsClientMock);
     }
-}
\ No newline at end of file
+}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
index d8296e1..bf2f73d 100644
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
+++ b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerReactiveHttpClientTest.java
@@ -19,7 +19,6 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.gson.JsonElement;
@@ -30,9 +29,16 @@
 import java.io.InputStream;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
@@ -40,13 +46,8 @@
 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.collectors.datafile.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.collectors.datafile.web.IRestTemplate;
-import org.springframework.http.HttpEntity;
+import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
 import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
 import org.springframework.web.util.DefaultUriBuilderFactory;
 
 import reactor.test.StepVerifier;
@@ -77,11 +78,12 @@
     private ConsumerDmaapModel consumerDmaapModel;
 
     private IFileSystemResource fileSystemResourceMock = mock(IFileSystemResource.class);
-    private IRestTemplate restTemplateMock = mock(IRestTemplate.class);
-    private ResponseEntity<String> responseEntityMock = mock(ResponseEntity.class);
+    private CloseableHttpAsyncClient clientMock;
+    private HttpResponse responseMock = mock(HttpResponse.class);
+    private Future<HttpResponse> futureMock = mock(Future.class);
+    private StatusLine statusLine = mock(StatusLine.class);
     private InputStream fileStream;
 
-
     @BeforeEach
     void setUp() {
         when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
@@ -111,59 +113,60 @@
 
         dmaapProducerReactiveHttpClient = new DmaapProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
         dmaapProducerReactiveHttpClient.setFileSystemResource(fileSystemResourceMock);
-        dmaapProducerReactiveHttpClient.setRestTemplate(restTemplateMock);
+        clientMock = mock(CloseableHttpAsyncClient.class);
+        dmaapProducerReactiveHttpClient.setWebClient(clientMock);
     }
 
     @Test
     void getHttpResponse_Success() throws Exception {
         mockWebClientDependantObject(true);
 
-        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
-                .expectNext(HttpStatus.OK.toString()).verifyComplete();
-
         URI expectedUri = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT)
                 .path(PUBLISH_TOPIC + URI_SEPARATOR + DEFAULT_FEED_ID + URI_SEPARATOR + FILE_NAME).build();
 
-        HttpHeaders headers = new HttpHeaders();
-
-        headers.setContentType(MediaType.parseMediaType(APPLICATION_OCTET_STREAM_CONTENT_TYPE));
+        HttpPut httpPut = new HttpPut();
+        httpPut.addHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_OCTET_STREAM_CONTENT_TYPE);
 
         JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(consumerDmaapModel));
         metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
         metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
-        headers.set(X_ATT_DR_META, metaData.toString());
+        httpPut.addHeader(X_ATT_DR_META, metaData.toString());
+        httpPut.setURI(expectedUri);
 
         String plainCreds = "dradmin" + ":" + "dradmin";
         byte[] plainCredsBytes = plainCreds.getBytes(StandardCharsets.ISO_8859_1);
         byte[] base64CredsBytes = Base64.encodeBase64(plainCredsBytes);
         String base64Creds = new String(base64CredsBytes);
-        headers.add("Authorization", "Basic " + base64Creds);
+        httpPut.addHeader("Authorization", "Basic " + base64Creds);
 
         fileStream.reset();
+        StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
+        .expectNext(responseMock.toString()).verifyComplete();
 
-        HttpEntity<byte[]> requestEntity = new HttpEntity<>(IOUtils.toByteArray(fileStream), headers);
         verify(fileSystemResourceMock).setPath("target/" + FILE_NAME);
-        verify(restTemplateMock).exchange(expectedUri, HttpMethod.PUT, requestEntity, String.class);
-        verifyNoMoreInteractions(restTemplateMock);
+        InputStream fileInputStream = fileSystemResourceMock.getInputStream();
+        httpPut.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
     }
 
     @Test
     void getHttpResponse_Fail() throws Exception {
         mockWebClientDependantObject(false);
-
         StepVerifier.create(dmaapProducerReactiveHttpClient.getDmaapProducerResponse(consumerDmaapModel))
                 .verifyComplete();
     }
 
-    private void mockWebClientDependantObject(boolean success) throws IOException {
+    private void mockWebClientDependantObject(boolean success)
+            throws IOException, InterruptedException, ExecutionException {
         fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes());
         when(fileSystemResourceMock.getInputStream()).thenReturn(fileStream);
-
         if (success) {
-            when(restTemplateMock.exchange(any(), any(), any(), any())).thenReturn(responseEntityMock);
-            when(responseEntityMock.getStatusCode()).thenReturn(HttpStatus.OK);
+            when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+            when(futureMock.get()).thenReturn(responseMock);
+            when(responseMock.getStatusLine()).thenReturn(statusLine);
+            when(statusLine.getStatusCode()).thenReturn(HttpUtils.SC_OK);
         } else {
-            when(restTemplateMock.exchange(any(), any(), any(), any())).thenThrow(new RuntimeException());
+            when(clientMock.execute(any(HttpPut.class), any())).thenReturn(futureMock);
+            when(futureMock.get()).thenThrow(new InterruptedException());
         }
     }
 }
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java
deleted file mode 100644
index b0f5c93..0000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RequestResponseLoggingInterceptorTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-
-import org.junit.jupiter.api.Test;
-import org.springframework.http.HttpRequest;
-import org.springframework.http.client.ClientHttpRequestExecution;
-import org.springframework.http.client.ClientHttpResponse;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-class RequestResponseLoggingInterceptorTest {
-
-    @Test
-    void intercept_shouldReturnObject() throws URISyntaxException, IOException {
-
-        //given
-        RequestResponseLoggingInterceptor requestResponseLoggingInterceptor = new RequestResponseLoggingInterceptor();
-
-        ClientHttpRequestExecution execution = mock(ClientHttpRequestExecution.class);
-        HttpRequest request = mock(HttpRequest.class);
-        ClientHttpResponse response = mock(ClientHttpResponse.class);
-
-        byte[] BODY = new byte[] { (byte)0xe0, 0x4f, (byte)0xd0, 0x20, (byte)0xa2 };
-        URI uri = new URI("www.someuri.com");
-
-        //when
-        when(execution.execute(request, BODY)).thenReturn(response);
-
-        //then
-        assertNotNull(requestResponseLoggingInterceptor.intercept(request, BODY, execution));
-    }
-}
diff --git a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java b/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java
deleted file mode 100644
index 3a0701f..0000000
--- a/datafile-dmaap-client/src/test/java/org/onap/dcaegen2/collectors/datafile/web/RestTemplateWrapperTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.web;
-
-import org.junit.jupiter.api.Test;
-
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-class RestTemplateWrapperTest {
-
-    @Test
-    void constructor_shouldReturnNotNullObject() throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
-        RestTemplateWrapper restTemplateWrapper = new RestTemplateWrapper();
-        assertNotNull(restTemplateWrapper);
-    }
-}
diff --git a/pom.xml b/pom.xml
index 6f8e7ed..12bcf6a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
   <groupId>org.onap.dcaegen2.collectors</groupId>
   <artifactId>datafile</artifactId>
-  <version>1.1.0-SNAPSHOT</version>
+  <version>1.1.1-SNAPSHOT</version>
 
   <name>dcaegen2-collectors.datafile</name>
   <description>datafile collector</description>
@@ -46,7 +46,7 @@
   <properties>
     <java.version>8</java.version>
     <immutable.version>2.7.1</immutable.version>
-    <spring.version>5.1.0.RELEASE</spring.version>
+    <spring.version>5.1.2.RELEASE</spring.version>
     <spring-boot.version>2.1.0.M4</spring-boot.version>
     <tomcat.version>8.5.34</tomcat.version>
     <docker.maven.version>1.0.0</docker.maven.version>
@@ -136,6 +136,16 @@
  <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>org.asynchttpclient</groupId>
+        <artifactId>async-http-client</artifactId>
+        <version>2.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.httpcomponents</groupId>
+        <artifactId>httpasyncclient</artifactId>
+        <version>4.1.4</version>
+      </dependency>
+      <dependency>
         <groupId>io.projectreactor</groupId>
         <artifactId>reactor-bom</artifactId>
         <version>Bismuth-SR10</version>
diff --git a/version.properties b/version.properties
index 73415a7..11ad59d 100644
--- a/version.properties
+++ b/version.properties
@@ -1,6 +1,6 @@
 major=1

 minor=1

-patch=0

+patch=1

 base_version=${major}.${minor}.${patch}

 release_version=${base_version}

 snapshot_version=${base_version}-SNAPSHOT