Added locking
This avoids that the same file is concurrently downloaded from several DFS instances.
While a file is downloaded, a lock is created.
Change-Id: I38db3565de37125de668cbab3f91d4f896cb0932
diff --git a/datafile/config/application.yaml b/datafile/config/application.yaml
index cf62d7b..71f3172 100644
--- a/datafile/config/application.yaml
+++ b/datafile/config/application.yaml
@@ -50,7 +50,7 @@
accessKeyId:
secretAccessKey:
bucket:
-
+ locksBucket:
springdoc:
show-actuator: true
swagger-ui.disable-swagger-default-url: true
\ No newline at end of file
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index 79e5910..f8be04d 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -92,8 +92,15 @@
@Value("${app.s3.bucket:}")
private String s3Bucket;
+ @Value("${app.s3.locksBucket:}")
+ private String s3LocksBucket;
+
+ public String getS3LocksBucket() {
+ return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
+ }
+
public boolean isS3Enabled() {
- return !s3EndpointOverride.isEmpty();
+ return !s3EndpointOverride.isEmpty() && !s3Bucket.isEmpty();
}
public String getKafkaBootStrapServers() {
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
index c856898..af0512e 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/DataStore.java
@@ -20,6 +20,10 @@
package org.onap.dcaegen2.collectors.datafile.datastore;
+import java.nio.file.Path;
+
+import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -28,11 +32,9 @@
FILES, LOCKS
}
- public Flux<String> listFiles(Bucket bucket, String prefix);
+ public Flux<String> listObjects(Bucket bucket, String prefix);
- public Mono<String> readFile(Bucket bucket, String fileName);
-
- public Mono<String> readFile(String bucket, String fileName);
+ public Mono<byte[]> readObject(Bucket bucket, String name);
public Mono<Boolean> createLock(String name);
@@ -40,4 +42,16 @@
public Mono<Boolean> deleteObject(Bucket bucket, String name);
+ public Mono<String> copyFileTo(Path from, String to);
+
+ public Mono<String> create(DataStore.Bucket bucket);
+
+ public Mono<String> deleteBucket(Bucket bucket);
+
+ public Mono<Boolean> fileExists(Bucket bucket, String key);
+
+ public static DataStore create(AppConfig config) {
+ return config.isS3Enabled() ? new S3ObjectStore(config) : new FileStore(config);
+ }
+
}
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
index fb1948a..7f497be 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/FileStore.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.datastore;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@@ -29,12 +30,15 @@
import java.util.stream.Stream;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
-import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class FileStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(java.lang.invoke.MethodHandles.lookup().lookupClass());
AppConfig applicationConfig;
@@ -43,12 +47,14 @@
}
@Override
- public Flux<String> listFiles(Bucket bucket, String prefix) {
+ public Flux<String> listObjects(Bucket bucket, String prefix) {
Path root = Path.of(applicationConfig.collectedFilesPath, prefix);
if (!root.toFile().exists()) {
root = root.getParent();
}
+ logger.debug("Listing files in: {}", root);
+
List<String> result = new ArrayList<>();
try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
@@ -63,22 +69,24 @@
private void filterListFiles(Path path, String prefix, List<String> result) {
if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
result.add(externalName(path));
+ } else {
+ logger.debug("Ignoring file {} that does not start with: {}", path, prefix);
}
}
- private String externalName(Path f) {
- String fullName = f.toString();
- return fullName.substring(applicationConfig.collectedFilesPath.length());
- }
-
- public Mono<String> readFile(String bucket, String fileName) {
- return Mono.error(new DatafileTaskException("readFile from bucket Not implemented"));
+ private String externalName(Path path) {
+ String fullName = path.toString();
+ String externalName = fullName.substring(applicationConfig.collectedFilesPath.length());
+ if (externalName.startsWith("/")) {
+ externalName = externalName.substring(1);
+ }
+ return externalName;
}
@Override
- public Mono<String> readFile(Bucket bucket, String fileName) {
+ public Mono<byte[]> readObject(Bucket bucket, String fileName) {
try {
- String contents = Files.readString(path(fileName));
+ byte[] contents = Files.readAllBytes(path(fileName));
return Mono.just(contents);
} catch (Exception e) {
return Mono.error(e);
@@ -89,13 +97,16 @@
public Mono<Boolean> createLock(String name) {
File file = path(name).toFile();
try {
+ Files.createDirectories(path(name).getParent());
boolean res = file.createNewFile();
return Mono.just(res);
} catch (Exception e) {
- return Mono.just(file.exists());
+ logger.warn("Could not create lock file: {}, reason: {}", file.getPath(), e.getMessage());
+ return Mono.just(!file.exists());
}
}
+ @Override
public Mono<String> copyFileTo(Path from, String to) {
try {
Path toPath = path(to);
@@ -122,8 +133,28 @@
}
}
+ @Override
+ public Mono<String> create(Bucket bucket) {
+ return Mono.just("OK");
+ }
+
private Path path(String name) {
return Path.of(applicationConfig.collectedFilesPath, name);
}
+ public Mono<Boolean> fileExists(Bucket bucket, String key) {
+ return Mono.just(path(key).toFile().exists());
+ }
+
+ @Override
+ public Mono<String> deleteBucket(Bucket bucket) {
+ try {
+ FileSystemUtils.deleteRecursively(Path.of(applicationConfig.collectedFilesPath));
+ } catch (IOException e) {
+ logger.debug("Could not delete directory: {}, reason; {}", applicationConfig.collectedFilesPath,
+ e.getMessage());
+ }
+ return Mono.just("OK");
+ }
+
}
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
index 811b9b7..f93bbaf 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/datastore/S3ObjectStore.java
@@ -21,7 +21,6 @@
package org.onap.dcaegen2.collectors.datafile.datastore;
import java.net.URI;
-import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
@@ -35,6 +34,7 @@
import reactor.core.publisher.Mono;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -63,12 +63,12 @@
public class S3ObjectStore implements DataStore {
private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
- private final AppConfig appConfig;
+ private final AppConfig applicationConfig;
private static S3AsyncClient s3AsynchClient;
public S3ObjectStore(AppConfig applicationConfig) {
- this.appConfig = applicationConfig;
+ this.applicationConfig = applicationConfig;
getS3AsynchClient(applicationConfig);
}
@@ -88,11 +88,10 @@
.credentialsProvider(StaticCredentialsProvider.create( //
AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
applicationConfig.getS3SecretAccessKey())));
-
}
@Override
- public Flux<String> listFiles(Bucket bucket, String prefix) {
+ public Flux<String> listObjects(Bucket bucket, String prefix) {
return listObjectsInBucket(bucket(bucket), prefix).map(S3Object::key);
}
@@ -102,11 +101,6 @@
.onErrorResume(t -> createLock(name, null));
}
- public Mono<Boolean> fileExists(Bucket bucket, String key) {
- return this.getHeadObject(bucket(bucket), key).map(obj -> true) //
- .onErrorResume(t -> Mono.just(false));
- }
-
private Mono<Boolean> createLock(String name, HeadObjectResponse head) {
if (head == null) {
@@ -138,15 +132,10 @@
}
@Override
- public Mono<String> readFile(Bucket bucket, String fileName) {
+ public Mono<byte[]> readObject(Bucket bucket, String fileName) {
return getDataFromS3Object(bucket(bucket), fileName);
}
- @Override
- public Mono<String> readFile(String bucket, String fileName) {
- return getDataFromS3Object(bucket, fileName);
- }
-
public Mono<String> putObject(Bucket bucket, String fileName, String bodyString) {
PutObjectRequest request = PutObjectRequest.builder() //
.bucket(bucket(bucket)) //
@@ -162,11 +151,18 @@
.doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
}
- public Mono<String> copyFileToS3(Bucket bucket, Path fromFile, String toFile) {
- return copyFileToS3Bucket(bucket(bucket), fromFile, toFile);
+ @Override
+ public Mono<String> copyFileTo(Path fromFile, String toFile) {
+ return copyFileToS3Bucket(bucket(Bucket.FILES), fromFile, toFile);
}
- public Mono<String> createS3Bucket(Bucket bucket) {
+ public Mono<Boolean> fileExists(Bucket bucket, String key) {
+ return this.getHeadObject(bucket(bucket), key).map(obj -> true) //
+ .onErrorResume(t -> Mono.just(false));
+ }
+
+ @Override
+ public Mono<String> create(Bucket bucket) {
return createS3Bucket(bucket(bucket));
}
@@ -180,10 +176,11 @@
return Mono.fromFuture(future) //
.map(f -> s3Bucket) //
- .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+ .doOnError(t -> logger.trace("Could not create S3 bucket: {}", t.getMessage()))
.onErrorResume(t -> Mono.just(s3Bucket));
}
+ @Override
public Mono<String> deleteBucket(Bucket bucket) {
return deleteAllFiles(bucket) //
.collectList() //
@@ -191,7 +188,6 @@
.map(resp -> "OK")
.doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(bucket), t.getMessage()))
.onErrorResume(t -> Mono.just("NOK"));
-
}
private Flux<DeleteObjectsResponse> deleteAllFiles(Bucket bucket) {
@@ -269,7 +265,7 @@
}
private String bucket(Bucket bucket) {
- return bucket == Bucket.FILES ? appConfig.getS3Bucket() : appConfig.getS3Bucket();
+ return bucket == Bucket.FILES ? applicationConfig.getS3Bucket() : applicationConfig.getS3LocksBucket();
}
private Mono<String> copyFileToS3Bucket(String s3Bucket, Path fileName, String s3Key) {
@@ -296,7 +292,7 @@
return Mono.fromFuture(future);
}
- private Mono<String> getDataFromS3Object(String bucket, String key) {
+ private Mono<byte[]> getDataFromS3Object(String bucket, String key) {
GetObjectRequest request = GetObjectRequest.builder() //
.bucket(bucket) //
@@ -307,9 +303,10 @@
s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
return Mono.fromFuture(future) //
- .map(b -> new String(b.asByteArray(), Charset.defaultCharset())) //
- .doOnError(t -> logger.error("Failed to get file from S3 {}", t.getMessage())) //
- .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
+ .map(BytesWrapper::asByteArray) //
+ .doOnError(
+ t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key, bucket, t.getMessage())) //
+ .doOnNext(n -> logger.debug("Read file from S3: {} {}", bucket, key)) //
.onErrorResume(t -> Mono.empty());
}
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
index 6b9210f..2323d0e 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/model/Counters.java
@@ -21,6 +21,7 @@
package org.onap.dcaegen2.collectors.datafile.model;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Various counters that can be shown via a REST API.
@@ -40,6 +41,8 @@
private long totalReceivedEvents = 0;
private Instant lastEventTime = Instant.MIN;
+ public final AtomicInteger threadPoolQueueSize = new AtomicInteger();
+
public synchronized void incNoOfReceivedEvents() {
totalReceivedEvents++;
lastEventTime = Instant.now();
diff --git a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
index c9f5028..2283418 100644
--- a/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
+++ b/datafile/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
@@ -25,7 +25,6 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -33,8 +32,8 @@
import org.onap.dcaegen2.collectors.datafile.commons.Scheme;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
import org.onap.dcaegen2.collectors.datafile.configuration.CertificateConfig;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.datastore.S3ObjectStore;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.http.HttpsClientConnectionManagerUtil;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
@@ -76,13 +75,12 @@
private static final Logger logger = LoggerFactory.getLogger(CollectAndReportFiles.class);
private final AppConfig appConfig;
- private final AtomicInteger threadPoolQueueSize = new AtomicInteger();
private Counters counters = new Counters();
private final KafkaSender<String, String> kafkaSender;
- private final S3ObjectStore s3ObjectStore;
+ private final DataStore dataStore;
/**
* Constructor for task registration in Datafile Workflow.
@@ -94,11 +92,9 @@
this.appConfig = applicationConfiguration;
this.kafkaSender = KafkaSender.create(kafkaSenderOptions());
initCerts();
- if (applicationConfiguration.isS3Enabled()) {
- this.s3ObjectStore = new S3ObjectStore(appConfig);
- } else {
- this.s3ObjectStore = null;
- }
+
+ this.dataStore = DataStore.create(applicationConfiguration);
+
start();
}
@@ -123,7 +119,8 @@
try {
logger.trace("Starting");
if (appConfig.isS3Enabled()) {
- tryCreateS3Bucket(appConfig.getS3Bucket()).subscribe();
+ this.dataStore.create(Bucket.FILES).subscribe();
+ this.dataStore.create(Bucket.LOCKS).subscribe();
}
Thread.sleep(delayMillis);
createMainTask().subscribe(null, s -> start(2000), null);
@@ -133,40 +130,35 @@
}
}
- private Mono<String> tryCreateS3Bucket(String s3Bucket) {
- return this.s3ObjectStore.createS3Bucket(Bucket.FILES) //
- .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) //
- .onErrorResume(t -> Mono.just(s3Bucket));
- }
-
Flux<FilePublishInformation> createMainTask() {
Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
return fetchFromKafka() //
- .doOnNext(fileReadyMessage -> threadPoolQueueSize.incrementAndGet()) //
+ .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
.doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
.parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
- .doOnNext(fileReadyMessage -> threadPoolQueueSize.decrementAndGet()) //
+ .doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
.flatMap(this::filterNotFetched, false, 1, 1) //
.flatMap(this::fetchFile, false, 1, 1) //
- .flatMap(this::moveFileToS3Bucket, false, 1) //
.flatMap(data -> reportFetchedFile(data, this.appConfig.collectedFileTopic), false, 1) //
.sequential() //
.doOnError(t -> logger.error("Received error: {}", t.toString())); //
}
+ private Mono<FileData> deleteLock(FileData info) {
+ return dataStore.deleteLock(lockName(info.name())).map(b -> info); //
+ }
+
private Mono<FilePublishInformation> moveFileToS3Bucket(FilePublishInformation info) {
if (this.appConfig.isS3Enabled()) {
-
- return s3ObjectStore.copyFileToS3(Bucket.FILES, locaFilePath(info), info.getName())
+ return dataStore.copyFileTo(locaFilePath(info), info.getName())
.doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
.map(f -> info) //
.doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
t.getMessage())) //
.doOnNext(n -> logger.debug("Stored file in S3: {}", info.getName())) //
- .onErrorResume(t -> Mono.empty()) //
.doOnNext(sig -> deleteLocalFile(info));
} else {
return Mono.just(info);
@@ -174,17 +166,19 @@
}
private Mono<FileData> filterNotFetched(FileData fileData) {
- Path path = fileData.getLocalFilePath(this.appConfig);
- if (this.appConfig.isS3Enabled()) {
- return s3ObjectStore.fileExists(Bucket.FILES, fileData.name()) //
- .filter(exists -> !exists).filter(exists -> !path.toFile().exists()).map(f -> fileData); //
+ Path localPath = fileData.getLocalFilePath(this.appConfig);
- } else {
- return path.toFile().exists() ? Mono.empty() : Mono.just(fileData);
- }
+ return dataStore.fileExists(Bucket.FILES, fileData.name()) //
+ .filter(exists -> !exists) //
+ .filter(exists -> !localPath.toFile().exists()) //
+ .map(f -> fileData); //
}
+ private String lockName(String fileName) {
+ return fileName + ".lck";
+ }
+
private Path locaFilePath(FilePublishInformation info) {
return Paths.get(this.appConfig.collectedFilesPath, info.getName());
}
@@ -239,14 +233,15 @@
return new FileCollector(appConfig, counters);
}
- int getThreadPoolQueueSize() {
- return this.threadPoolQueueSize.get();
- }
-
- protected Mono<FilePublishInformation> fetchFile(FileData fileData) {
- return createFileCollector() //
- .collectFile(fileData, FILE_TRANSFER_MAX_RETRIES, FILE_TRANSFER_INITIAL_RETRY_TIMEOUT) //
- .onErrorResume(exception -> handleFetchFileFailure(fileData, exception));
+ private Mono<FilePublishInformation> fetchFile(FileData fileData) {
+ return this.dataStore.createLock(lockName(fileData.name())).filter(granted -> granted) //
+ .map(granted -> createFileCollector()) //
+ .flatMap(collector -> collector.collectFile(fileData, FILE_TRANSFER_MAX_RETRIES,
+ FILE_TRANSFER_INITIAL_RETRY_TIMEOUT)) //
+ .flatMap(this::moveFileToS3Bucket) //
+ .doOnNext(b -> deleteLock(fileData).subscribe()) //
+ .doOnError(b -> deleteLock(fileData).subscribe()) //
+ .onErrorResume(exception -> handleFetchFileFailure(fileData, exception)); //
}
private Mono<FilePublishInformation> handleFetchFileFailure(FileData fileData, Throwable t) {
diff --git a/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java b/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
index a2b793c..02f8882 100644
--- a/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
+++ b/datafile/src/test/java/org/onap/dcaegen2/collectors/datafile/MockDatafile.java
@@ -28,6 +28,7 @@
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@@ -37,8 +38,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.datastore.DataStore;
import org.onap.dcaegen2.collectors.datafile.datastore.DataStore.Bucket;
-import org.onap.dcaegen2.collectors.datafile.datastore.S3ObjectStore;
import org.onap.dcaegen2.collectors.datafile.model.Counters;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -74,8 +75,8 @@
"app.s3.endpointOverride=http://localhost:9000", //
"app.s3.accessKeyId=minio", //
"app.s3.secretAccessKey=miniostorage", //
- "app.s3.bucket=ropfiles" //
- })
+ "app.s3.bucket=ropfiles", //
+ "app.s3.locksBucket=locks"})
@SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
class MockDatafile {
@@ -154,16 +155,16 @@
}
}
- static class CollectAndReportFilesMock extends CollectAndReportFiles {
+ static class FileCollectorMock extends FileCollector {
final AppConfig appConfig;
- public CollectAndReportFilesMock(AppConfig appConfig) {
- super(appConfig);
+ public FileCollectorMock(AppConfig appConfig) {
+ super(appConfig, new Counters());
this.appConfig = appConfig;
}
@Override // (override fetchFile to disable the actual file fetching)
- protected Mono<FilePublishInformation> fetchFile(FileData fileData) {
+ public Mono<FilePublishInformation> collectFile(FileData fileData, long numRetries, Duration firstBackoff) {
FileCollector fc = new FileCollector(this.appConfig, new Counters());
FilePublishInformation i = fc.createFilePublishInformation(fileData);
@@ -179,6 +180,20 @@
}
}
+ static class CollectAndReportFilesMock extends CollectAndReportFiles {
+ final AppConfig appConfig;
+
+ public CollectAndReportFilesMock(AppConfig appConfig) {
+ super(appConfig);
+ this.appConfig = appConfig;
+ }
+
+ @Override // (override fetchFile to disable the actual file fetching)
+ protected FileCollector createFileCollector() {
+ return new FileCollectorMock(appConfig);
+ }
+ }
+
@TestConfiguration
static class TestBeanFactory {
@@ -199,8 +214,9 @@
@AfterEach
void afterEach() {
- S3ObjectStore store = new S3ObjectStore(this.appConfig);
+ DataStore store = DataStore.create(this.appConfig);
store.deleteBucket(Bucket.FILES).block();
+ store.deleteBucket(Bucket.LOCKS).block();
deleteAllFiles();
}
@@ -241,7 +257,7 @@
void testS3Concurrency() throws Exception {
waitForKafkaListener();
- final int NO_OF_OBJECTS = 10000;
+ final int NO_OF_OBJECTS = 10;
Instant startTime = Instant.now();