NONRTRIC - dmaap adapter characteristic improvement
Minor changes, aesthetics.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I7dc76691f45d30555be66511e1b78c6e5231d01f
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java
index a412370..991ecc5 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java
@@ -21,6 +21,7 @@
package org.oran.dmaapadapter.tasks;
import lombok.Getter;
+import lombok.ToString;
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
@@ -43,6 +44,17 @@
private Disposable subscription;
private final ErrorStats errorStats = new ErrorStats();
+ @ToString
+ public static class DataToConsumer {
+ public final String key;
+ public final String value;
+
+ public DataToConsumer(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+ }
+
private class ErrorStats {
private int consumerFaultCounter = 0;
private boolean irrecoverableError = false; // eg. overflow
@@ -73,7 +85,7 @@
this.job = job;
}
- public synchronized void start(Flux<TopicListener.Output> input) {
+ public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
stop();
this.errorStats.resetIrrecoverableErrors();
this.subscription = handleReceivedMessage(input, job) //
@@ -89,7 +101,7 @@
stop();
}
- protected abstract Mono<String> sendToClient(TopicListener.Output output);
+ protected abstract Mono<String> sendToClient(DataToConsumer output);
public synchronized void stop() {
if (this.subscription != null) {
@@ -102,17 +114,16 @@
return this.subscription != null;
}
- private Flux<TopicListener.Output> handleReceivedMessage(Flux<TopicListener.Output> inputFlux, Job job) {
- Flux<TopicListener.Output> result =
- inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) //
- .filter(t -> !t.value.isEmpty()); //
+ private Flux<DataToConsumer> handleReceivedMessage(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+ Flux<DataToConsumer> result = inputFlux.map(input -> new DataToConsumer(input.key, job.filter(input.value))) //
+ .filter(t -> !t.value.isEmpty()); //
if (job.isBuffered()) {
result = result.map(input -> quoteNonJson(input.value, job)) //
.bufferTimeout( //
job.getParameters().getBufferTimeout().getMaxSize(), //
job.getParameters().getBufferTimeout().getMaxTime()) //
- .map(buffered -> new TopicListener.Output("", buffered.toString()));
+ .map(buffered -> new DataToConsumer("", buffered.toString()));
}
return result;
}
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
index 69226ca..fc571d5 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
@@ -45,7 +45,7 @@
private final ApplicationConfig applicationConfig;
private final InfoType type;
private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
- private Flux<Output> output;
+ private Flux<DataFromTopic> dataFromDmaap;
public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
AsyncRestClientFactory restclientFactory =
@@ -56,14 +56,14 @@
}
@Override
- public Flux<Output> getOutput() {
- if (this.output == null) {
- this.output = createOutput();
+ public Flux<DataFromTopic> getFlux() {
+ if (this.dataFromDmaap == null) {
+ this.dataFromDmaap = startFetchFromDmaap();
}
- return this.output;
+ return this.dataFromDmaap;
}
- private Flux<Output> createOutput() {
+ private Flux<DataFromTopic> startFetchFromDmaap() {
return Flux.range(0, Integer.MAX_VALUE) //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
.doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
@@ -71,7 +71,7 @@
.doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
.publish() //
.autoConnect() //
- .map(input -> new Output("", input)); //
+ .map(input -> new DataFromTopic("", input)); //
}
private String getDmaapUrl() {
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java
index 87a6b67..b2ade98 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java
@@ -39,7 +39,7 @@
}
@Override
- protected Mono<String> sendToClient(TopicListener.Output output) {
+ protected Mono<String> sendToClient(DataToConsumer output) {
Job job = this.getJob();
logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
index 406c6f3..94a7aeb 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
@@ -54,7 +54,7 @@
}
@Override
- protected Mono<String> sendToClient(TopicListener.Output data) {
+ protected Mono<String> sendToClient(DataToConsumer data) {
Job job = this.getJob();
logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
@@ -67,7 +67,7 @@
}
@Override
- public synchronized void start(Flux<TopicListener.Output> input) {
+ public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
super.start(input);
SenderOptions<String, String> senderOptions = senderOptions(appConfig);
this.sender = KafkaSender.create(senderOptions);
@@ -93,7 +93,7 @@
return SenderOptions.create(props);
}
- private SenderRecord<String, String, Integer> senderRecord(TopicListener.Output output, Job infoJob) {
+ private SenderRecord<String, String, Integer> senderRecord(DataToConsumer output, Job infoJob) {
int correlationMetadata = 2;
String topic = infoJob.getParameters().getKafkaOutputTopic();
return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
index 8d36fdd..11c0c28 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
@@ -44,7 +44,7 @@
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private Flux<Output> output;
+ private Flux<DataFromTopic> dataFromTopic;
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
@@ -52,14 +52,14 @@
}
@Override
- public Flux<Output> getOutput() {
- if (this.output == null) {
- this.output = createOutput();
+ public Flux<DataFromTopic> getFlux() {
+ if (this.dataFromTopic == null) {
+ this.dataFromTopic = startReceiveFromTopic();
}
- return this.output;
+ return this.dataFromTopic;
}
- private Flux<Output> createOutput() {
+ private Flux<DataFromTopic> startReceiveFromTopic() {
logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
@@ -69,7 +69,7 @@
.doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
.publish() //
.autoConnect() //
- .map(input -> new Output(input.key(), input.value())); //
+ .map(input -> new DataFromTopic(input.key(), input.value())); //
}
private ReceiverOptions<String, String> kafkaInputProperties() {
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
index 54254a3..cb7c3de 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
@@ -26,15 +26,15 @@
public interface TopicListener {
@ToString
- public static class Output {
+ public static class DataFromTopic {
public final String key;
public final String value;
- public Output(String key, String value) {
+ public DataFromTopic(String key, String value) {
this.key = key;
this.value = value;
}
}
- public Flux<Output> getOutput();
+ public Flux<DataFromTopic> getFlux();
}
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
index 6c0f48f..4f3148d 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
@@ -101,7 +101,7 @@
private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
DataConsumer consumer = createConsumer(job);
- consumer.start(topicListener.getOutput());
+ consumer.start(topicListener.getFlux());
consumers.put(job.getType().getId(), job.getId(), consumer);
}
diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
index a3febaf..7eaf7ab 100644
--- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
+++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
@@ -300,7 +300,7 @@
// Handle received data from Kafka, check that it has been posted to the
// consumer
- kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data")));
+ kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data")));
ConsumerController.TestResults consumer = this.consumerController.testResults;
await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
index 0fa7a8e..bc650f7 100644
--- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
+++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
@@ -158,7 +158,7 @@
private static class KafkaReceiver {
public final String OUTPUT_TOPIC;
- private TopicListener.Output receivedKafkaOutput;
+ private TopicListener.DataFromTopic receivedKafkaOutput;
private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
int count = 0;
@@ -171,13 +171,13 @@
InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
- topicListener.getOutput() //
+ topicListener.getFlux() //
.doOnNext(this::set) //
.doFinally(sig -> logger.info("Finally " + sig)) //
.subscribe();
}
- private void set(TopicListener.Output receivedKafkaOutput) {
+ private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
this.receivedKafkaOutput = receivedKafkaOutput;
this.count++;
logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
@@ -193,7 +193,7 @@
void reset() {
count = 0;
- this.receivedKafkaOutput = new TopicListener.Output("", "");
+ this.receivedKafkaOutput = new TopicListener.DataFromTopic("", "");
}
}