Making concurrency configurable
The concurrency is made configurable by means of a parameter in application.yaml.
The retry when storing in S3 failed is made less aggressive.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-853
Change-Id: I818ca633adbc08e7a74b1092327af5191200cc44
diff --git a/datafilecollector/config/application.yaml b/datafilecollector/config/application.yaml
index 71f3172..5b00a74 100644
--- a/datafilecollector/config/application.yaml
+++ b/datafilecollector/config/application.yaml
@@ -28,7 +28,9 @@
app:
filepath: config/datafile_endpoints_test.json
collected-files-path: "/tmp/onap_datafile/"
- # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
+ # Numer of worker threads. Increased number may increase throughput, but will require more executing resources.
+ number-of-worker-treads: 200
+ # KAFKA boostrap servers.
# several redundant boostrap servers can be specified, separated by a comma ','.
kafka:
bootstrap-servers: localhost:9092
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
index f8be04d..f25d6fd 100644
--- a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfig.java
@@ -95,6 +95,10 @@
@Value("${app.s3.locksBucket:}")
private String s3LocksBucket;
+ @Value("${app.number-of-worker-treads:200}")
+ @Getter
+ private int noOfWorkerThreads;
+
public String getS3LocksBucket() {
return s3LocksBucket.isEmpty() ? s3Bucket : s3LocksBucket;
}
diff --git a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
index c4f2ac3..46e71cc 100644
--- a/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
+++ b/datafilecollector/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/CollectAndReportFiles.java
@@ -70,7 +70,6 @@
.disableHtmlEscaping() //
.create(); //
- private static final int NUMBER_OF_WORKER_THREADS = 200;
private static final long FILE_TRANSFER_MAX_RETRIES = 2;
private static final Duration FILE_TRANSFER_INITIAL_RETRY_TIMEOUT = Duration.ofSeconds(2);
@@ -132,11 +131,12 @@
}
Flux<FilePublishInformation> createMainTask() {
- Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", NUMBER_OF_WORKER_THREADS);
+ final int noOfWorkerThreads = appConfig.getNoOfWorkerThreads();
+ Scheduler scheduler = Schedulers.newParallel("FileCollectorWorker", noOfWorkerThreads);
return fetchFromKafka() //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.incrementAndGet()) //
.doOnNext(fileReadyMessage -> counters.incNoOfReceivedEvents()) //
- .parallel(NUMBER_OF_WORKER_THREADS) // Each FileReadyMessage in a separate thread
+ .parallel(noOfWorkerThreads) // Each FileReadyMessage in a separate thread
.runOn(scheduler) //
.doOnNext(fileReadyMessage -> counters.threadPoolQueueSize.decrementAndGet()) //
.flatMap(fileReadyMessage -> Flux.fromIterable(FileData.createFileData(fileReadyMessage)), true, 1) //
@@ -155,7 +155,7 @@
if (this.appConfig.isS3Enabled()) {
return dataStore.copyFileTo(locaFilePath(info), info.getName())
.doOnError(t -> logger.warn("Failed to store file '{}' in S3 {}", info.getName(), t.getMessage())) //
- .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(1000))) //
+ .retryWhen(Retry.backoff(4, Duration.ofMillis(1000))) //
.map(f -> info) //
.doOnError(t -> logger.error("Failed to store file '{}' in S3 after retries {}", info.getName(),
t.getMessage())) //