Added locking

This avoids that the same file is concurrently downloaded from several DFS instances.
While a file is downloaded, a lock is created.

Change-Id: I38db3565de37125de668cbab3f91d4f896cb0932
diff --git a/datafile/config/application.yaml b/datafile/config/application.yaml
index cf62d7b..71f3172 100644
--- a/datafile/config/application.yaml
+++ b/datafile/config/application.yaml
@@ -50,7 +50,7 @@
     accessKeyId:
     secretAccessKey:
     bucket:
-
+    locksBucket:
 springdoc:
   show-actuator: true
   swagger-ui.disable-swagger-default-url: true
\ No newline at end of file
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 79e5910..f8be04d 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -92,8 +92,15 @@
     @Value("${app.s3.bucket:}")
     private String s3Bucket;
 
+    @Value("${app.s3.locksBucket:}")
+    private String s3LocksBucket;
+
+    public String getS3LocksBucket() {
+        return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
+    }
+
     public boolean isS3Enabled() {
-        return !s3EndpointOverride.isEmpty();
+        return !s3EndpointOverride.isEmpty() && !s3Bucket.isEmpty();
     }
 
     public String getKafkaBootStrapServers() {
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
index c856898..af0512e 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
@@ -20,6 +20,10 @@
 
 package org.onap.dcaegen2.collectors.datafile.datastore;
 
+import java.nio.file.Path;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -28,11 +32,9 @@
         FILES, LOCKS
     }
 
-    public Flux<String> listFiles(Bucket bucket, String prefix);
+    public Flux<String> listObjects(Bucket bucket, String prefix);
 
-    public Mono<String> readFile(Bucket bucket, String fileName);
-
-    public Mono<String> readFile(String bucket, String fileName);
+    public Mono<byte[]> readObject(Bucket bucket, String name);
 
     public Mono<Boolean> createLock(String name);
 
@@ -40,4 +42,16 @@
 
     public Mono<Boolean> deleteObject(Bucket bucket, String name);
 
+    public Mono<String> copyFileTo(Path from, String to);
+
+    public Mono<String> create(DataStore.Bucket bucket);
+
+    public Mono<String> deleteBucket(Bucket bucket);
+
+    public Mono<Boolean> fileExists(Bucket bucket, String key);
+
+    public static DataStore create(AppConfig config) {
+        return config.isS3Enabled() ? new S3ObjectStore(config) : new FileStore(config);
+    }
+
 }
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
index fb1948a..7f497be 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.collectors.datafile.datastore;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
@@ -29,12 +30,15 @@
 import java.util.stream.Stream;
 
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class FileStore implements DataStore {
+    private static final Logger logger = LoggerFactory.getLogger(java.lang.invoke.MethodHandles.lookup().lookupClass());
 
     AppConfig applicationConfig;
 
@@ -43,12 +47,14 @@
     }
 
     @Override
-    public Flux<String> listFiles(Bucket bucket, String prefix) {
+    public Flux<String> listObjects(Bucket bucket, String prefix) {
         Path root = Path.of(applicationConfig.collectedFilesPath, prefix);
         if (!root.toFile().exists()) {
             root = root.getParent();
         }
 
+        logger.debug("Listing files in: {}", root);
+
         List<String> result = new ArrayList<>();
         try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
 
@@ -63,22 +69,24 @@
     private void filterListFiles(Path path, String prefix, List<String> result) {
         if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
             result.add(externalName(path));
+        } else {
+            logger.debug("Ignoring file {} that does not start with: {}", path, prefix);
         }
     }
 
-    private String externalName(Path f) {
-        String fullName = f.toString();
-        return fullName.substring(applicationConfig.collectedFilesPath.length());
-    }
-
-    public Mono<String> readFile(String bucket, String fileName) {
-        return Mono.error(new DatafileTaskException("readFile from bucket Not implemented"));
+    private String externalName(Path path) {
+        String fullName = path.toString();
+        String externalName = fullName.substring(applicationConfig.collectedFilesPath.length());
+        if (externalName.startsWith("/")) {
+            externalName = externalName.substring(1);
+        }
+        return externalName;
     }
 
     @Override
-    public Mono<String> readFile(Bucket bucket, String fileName) {
+    public Mono<byte[]> readObject(Bucket bucket, String fileName) {
         try {
-            String contents = Files.readString(path(fileName));
+            byte[] contents = Files.readAllBytes(path(fileName));
             return Mono.just(contents);
         } catch (Exception e) {
             return Mono.error(e);
@@ -89,13 +97,16 @@
     public Mono<Boolean> createLock(String name) {
         File file = path(name).toFile();
         try {
+            Files.createDirectories(path(name).getParent());
             boolean res = file.createNewFile();
             return Mono.just(res);
         } catch (Exception e) {
-            return Mono.just(file.exists());
+            logger.warn("Could not create lock file: {}, reason: {}", file.getPath(), e.getMessage());
+            return Mono.just(!file.exists());
         }
     }
 
+    @Override
     public Mono<String> copyFileTo(Path from, String to) {
         try {
             Path toPath = path(to);
@@ -122,8 +133,28 @@
         }
     }
 
+    @Override
+    public Mono<String> create(Bucket bucket) {
+        return Mono.just("OK");
+    }
+
     private Path path(String name) {
         return Path.of(applicationConfig.collectedFilesPath, name);
     }
 
+    public Mono<Boolean> fileExists(Bucket bucket, String key) {
+        return Mono.just(path(key).toFile().exists());
+    }
+
+    @Override
+    public Mono<String> deleteBucket(Bucket bucket) {
+        try {
+            FileSystemUtils.deleteRecursively(Path.of(applicationConfig.collectedFilesPath));
+        } catch (IOException e) {
+            logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.collectedFilesPath,
+                e.getMessage());
+        }
+        return Mono.just("OK");
+    }
+
 }
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
index 811b9b7..f93bbaf 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
@@ -21,7 +21,6 @@
 package org.onap.dcaegen2.collectors.datafile.datastore;
 
 import java.net.URI;
-import java.nio.charset.Charset;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -35,6 +34,7 @@
 import reactor.core.publisher.Mono;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
 import software.amazon.awssdk.core.ResponseBytes;
 import software.amazon.awssdk.core.async.AsyncRequestBody;
 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -63,12 +63,12 @@
 
 public class S3ObjectStore implements DataStore {
     private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
-    private final AppConfig appConfig;
+    private final AppConfig applicationConfig;
 
     private static S3AsyncClient s3AsynchClient;
 
     public S3ObjectStore(AppConfig applicationConfig) {
-        this.appConfig = applicationConfig;
+        this.applicationConfig = applicationConfig;
 
         getS3AsynchClient(applicationConfig);
     }
@@ -88,11 +88,10 @@
             .credentialsProvider(StaticCredentialsProvider.create( //
                 AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
                     applicationConfig.getS3SecretAccessKey())));
-
     }
 
     @Override
-    public Flux<String> listFiles(Bucket bucket, String prefix) {
+    public Flux<String> listObjects(Bucket bucket, String prefix) {
         return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
     }
 
@@ -102,11 +101,6 @@
             .onErrorResume(t -> createLock(name, null));
     }
 
-    public Mono<Boolean> fileExists(Bucket bucket, String key) {
-        return this.getHeadObject(bucket(bucket), key).map(obj -> true) //
-            .onErrorResume(t -> Mono.just(false));
-    }
-
     private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
         if (head == null) {
 
@@ -138,15 +132,10 @@
     }
 
     @Override
-    public Mono<String> readFile(Bucket bucket, String fileName) {
+    public Mono<byte[]> readObject(Bucket bucket, String fileName) {
         return getDataFromS3Object(bucket(bucket), fileName);
     }
 
-    @Override
-    public Mono<String> readFile(String bucket, String fileName) {
-        return getDataFromS3Object(bucket, fileName);
-    }
-
     public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
         PutObjectRequest request = PutObjectRequest.builder() //
             .bucket(bucket(bucket)) //
@@ -162,11 +151,18 @@
             .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
     }
 
-    public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
-        return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
+    @Override
+    public Mono<String> copyFileTo(Path fromFile, String toFile) {
+        return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile);
     }
 
-    public Mono<String> createS3Bucket(Bucket bucket) {
+    public Mono<Boolean> fileExists(Bucket bucket, String key) {
+        return this.getHeadObject(bucket(bucket), key).map(obj -> true) //
+            .onErrorResume(t -> Mono.just(false));
+    }
+
+    @Override
+    public Mono<String> create(Bucket bucket) {
         return createS3Bucket(bucket(bucket));
     }
 
@@ -180,10 +176,11 @@
 
         return Mono.fromFuture(future) //
             .map(f -> s3Bucket) //
-            .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+            .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
             .onErrorResume(t -> Mono.just(s3Bucket));
     }
 
+    @Override
     public Mono<String> deleteBucket(Bucket bucket) {
         return deleteAllFiles(bucket) //
             .collectList() //
@@ -191,7 +188,6 @@
             .map(resp -> "OK")
             .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
             .onErrorResume(t -> Mono.just("NOK"));
-
     }
 
     private Flux<DeleteObjectsResponse> deleteAllFiles(Bucket bucket) {
@@ -269,7 +265,7 @@
     }
 
     private String bucket(Bucket bucket) {
-        return bucket == Bucket.FILES ? appConfig.getS3Bucket() : appConfig.getS3Bucket();
+        return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
     }
 
     private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
@@ -296,7 +292,7 @@
         return Mono.fromFuture(future);
     }
 
-    private Mono<String> getDataFromS3Object(String bucket, String key) {
+    private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
 
         GetObjectRequest request = GetObjectRequest.builder() //
             .bucket(bucket) //
@@ -307,9 +303,10 @@
             s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
 
         return Mono.fromFuture(future) //
-            .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
-            .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
-            .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+            .map(BytesWrapper::asByteArray) //
+            .doOnError(
+                t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, t.getMessage())) //
+            .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
             .onErrorResume(t -> Mono.empty());
     }
 
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
index 6b9210f..2323d0e 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
@@ -21,6 +21,7 @@
 package org.onap.dcaegen2.collectors.datafile.model;
 
 import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Various counters that can be shown via a REST API.
@@ -40,6 +41,8 @@
     private long totalReceivedEvents = 0;
     private Instant lastEventTime = Instant.MIN;
 
+    public final AtomicInteger threadPoolQueueSize = new AtomicInteger();
+
     public synchronized void incNoOfReceivedEvents() {
         totalReceivedEvents++;
         lastEventTime = Instant.now();
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
index c9f5028..2283418 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
@@ -25,7 +25,6 @@
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -33,8 +32,8 @@
 import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
 import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.datastore.S3ObjectStore;
 import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
 import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -76,13 +75,12 @@
     private static final Logger logger = LoggerFactory.getLogger(CollectAndReportFiles.class);
 
     private final AppConfig appConfig;
-    private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
 
     private Counters counters = new Counters();
 
     private final KafkaSender<String, String> kafkaSender;
 
-    private final S3ObjectStore s3ObjectStore;
+    private final DataStore dataStore;
 
     /**
      * Constructor for task registration in Datafile Workflow.
@@ -94,11 +92,9 @@
         this.appConfig = applicationConfiguration;
         this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
         initCerts();
-        if (applicationConfiguration.isS3Enabled()) {
-            this.s3ObjectStore = new S3ObjectStore(appConfig);
-        } else {
-            this.s3ObjectStore = null;
-        }
+
+        this.dataStore = DataStore.create(applicationConfiguration);
+
         start();
     }
 
@@ -123,7 +119,8 @@
         try {
             logger.trace("Starting");
             if (appConfig.isS3Enabled()) {
-                tryCreateS3Bucket(appConfig.getS3Bucket()).subscribe();
+                this.dataStore.create(Bucket.FILES).subscribe();
+                this.dataStore.create(Bucket.LOCKS).subscribe();
             }
             Thread.sleep(delayMillis);
             createMainTask().subscribe(null, s -> start(2000), null);
@@ -133,40 +130,35 @@
         }
     }
 
-    private Mono<String> tryCreateS3Bucket(String s3Bucket) {
-        return this.s3ObjectStore.createS3Bucket(Bucket.FILES) //
-            .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) //
-            .onErrorResume(t -> Mono.just(s3Bucket));
-    }
-
     Flux<FilePublishInformation> createMainTask() {
         Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
         return fetchFromKafka() //
-            .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
+            .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
             .doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
             .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
             .runOn(scheduler) //
-            .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
+            .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
             .flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
             .flatMap(this::filterNotFetched, false, 1, 1) //
             .flatMap(this::fetchFile, false, 1, 1) //
-            .flatMap(this::moveFileToS3Bucket, false, 1) //
             .flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
             .sequential() //
             .doOnError(t -> logger.error("Received error: {}", t.toString())); //
     }
 
+    private Mono<FileData> deleteLock(FileData info) {
+        return dataStore.deleteLock(lockName(info.name())).map(b -> info); //
+    }
+
     private Mono<FilePublishInformation> moveFileToS3Bucket(FilePublishInformation info) {
         if (this.appConfig.isS3Enabled()) {
-
-            return s3ObjectStore.copyFileToS3(Bucket.FILES, locaFilePath(info), info.getName())
+            return dataStore.copyFileTo(locaFilePath(info), info.getName())
                 .doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
                 .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
                 .map(f -> info) //
                 .doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
                     t.getMessage())) //
                 .doOnNext(n -> logger.debug("Stored file in S3: {}", info.getName())) //
-                .onErrorResume(t -> Mono.empty()) //
                 .doOnNext(sig -> deleteLocalFile(info));
         } else {
             return Mono.just(info);
@@ -174,17 +166,19 @@
     }
 
     private Mono<FileData> filterNotFetched(FileData fileData) {
-        Path path = fileData.getLocalFilePath(this.appConfig);
-        if (this.appConfig.isS3Enabled()) {
-            return s3ObjectStore.fileExists(Bucket.FILES, fileData.name()) //
-                .filter(exists -> !exists).filter(exists -> !path.toFile().exists()).map(f -> fileData); //
+        Path localPath = fileData.getLocalFilePath(this.appConfig);
 
-        } else {
-            return path.toFile().exists() ? Mono.empty() : Mono.just(fileData);
-        }
+        return dataStore.fileExists(Bucket.FILES, fileData.name()) //
+            .filter(exists -> !exists) //
+            .filter(exists -> !localPath.toFile().exists()) //
+            .map(f -> fileData); //
 
     }
 
+    private String lockName(String fileName) {
+        return fileName + ".lck";
+    }
+
     private Path locaFilePath(FilePublishInformation info) {
         return Paths.get(this.appConfig.collectedFilesPath, info.getName());
     }
@@ -239,14 +233,15 @@
         return new FileCollector(appConfig, counters);
     }
 
-    int getThreadPoolQueueSize() {
-        return this.threadPoolQueueSize.get();
-    }
-
-    protected Mono<FilePublishInformation> fetchFile(FileData fileData) {
-        return createFileCollector() //
-            .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT) //
-            .onErrorResume(exception -> handleFetchFileFailure(fileData, exception));
+    private Mono<FilePublishInformation> fetchFile(FileData fileData) {
+        return this.dataStore.createLock(lockName(fileData.name())).filter(granted -> granted) //
+            .map(granted -> createFileCollector()) //
+            .flatMap(collector -> collector.collectFile(fileData, FILE_TRANSFER_MAX_RETRIES,
+                FILE_TRANSFER_INITIAL_RETRY_TIMEOUT)) //
+            .flatMap(this::moveFileToS3Bucket) //
+            .doOnNext(b -> deleteLock(fileData).subscribe()) //
+            .doOnError(b -> deleteLock(fileData).subscribe()) //
+            .onErrorResume(exception -> handleFetchFileFailure(fileData, exception)); //
     }
 
     private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
diff --git a/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java b/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
index a2b793c..02f8882 100644
--- a/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
+++ b/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
@@ -28,6 +28,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
@@ -37,8 +38,8 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
 import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.datastore.S3ObjectStore;
 import org.onap.dcaegen2.collectors.datafile.model.Counters;
 import org.onap.dcaegen2.collectors.datafile.model.FileData;
 import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -74,8 +75,8 @@
         "app.s3.endpointOverride=http://localhost:9000", //
         "app.s3.accessKeyId=minio", //
         "app.s3.secretAccessKey=miniostorage", //
-        "app.s3.bucket=ropfiles" //
-    })
+        "app.s3.bucket=ropfiles", //
+        "app.s3.locksBucket=locks"})
 @SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
 class MockDatafile {
 
@@ -154,16 +155,16 @@
         }
     }
 
-    static class CollectAndReportFilesMock extends CollectAndReportFiles {
+    static class FileCollectorMock extends FileCollector {
         final AppConfig appConfig;
 
-        public CollectAndReportFilesMock(AppConfig appConfig) {
-            super(appConfig);
+        public FileCollectorMock(AppConfig appConfig) {
+            super(appConfig, new Counters());
             this.appConfig = appConfig;
         }
 
         @Override // (override fetchFile to disable the actual file fetching)
-        protected Mono<FilePublishInformation> fetchFile(FileData fileData) {
+        public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
             FileCollector fc = new FileCollector(this.appConfig, new Counters());
             FilePublishInformation i = fc.createFilePublishInformation(fileData);
 
@@ -179,6 +180,20 @@
         }
     }
 
+    static class CollectAndReportFilesMock extends CollectAndReportFiles {
+        final AppConfig appConfig;
+
+        public CollectAndReportFilesMock(AppConfig appConfig) {
+            super(appConfig);
+            this.appConfig = appConfig;
+        }
+
+        @Override // (override fetchFile to disable the actual file fetching)
+        protected FileCollector createFileCollector() {
+            return new FileCollectorMock(appConfig);
+        }
+    }
+
     @TestConfiguration
     static class TestBeanFactory {
 
@@ -199,8 +214,9 @@
 
     @AfterEach
     void afterEach() {
-        S3ObjectStore store = new S3ObjectStore(this.appConfig);
+        DataStore store = DataStore.create(this.appConfig);
         store.deleteBucket(Bucket.FILES).block();
+        store.deleteBucket(Bucket.LOCKS).block();
         deleteAllFiles();
 
     }
@@ -241,7 +257,7 @@
     void testS3Concurrency() throws Exception {
         waitForKafkaListener();
 
-        final int NO_OF_OBJECTS = 10000;
+        final int NO_OF_OBJECTS = 10;
 
         Instant startTime = Instant.now();