NONRTRIC - dmaap adapter characteristic improvement

Minor changes, aesthetics.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I7dc76691f45d30555be66511e1b78c6e5231d01f
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java
index a412370..991ecc5 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/DataConsumer.java
@@ -21,6 +21,7 @@
 package org.oran.dmaapadapter.tasks;
 
 import lombok.Getter;
+import lombok.ToString;
 
 import org.oran.dmaapadapter.repository.Job;
 import org.slf4j.Logger;
@@ -43,6 +44,17 @@
     private Disposable subscription;
     private final ErrorStats errorStats = new ErrorStats();
 
+    @ToString
+    public static class DataToConsumer {
+        public final String key;
+        public final String value;
+
+        public DataToConsumer(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+    }
+
     private class ErrorStats {
         private int consumerFaultCounter = 0;
         private boolean irrecoverableError = false; // eg. overflow
@@ -73,7 +85,7 @@
         this.job = job;
     }
 
-    public synchronized void start(Flux<TopicListener.Output> input) {
+    public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
         stop();
         this.errorStats.resetIrrecoverableErrors();
         this.subscription = handleReceivedMessage(input, job) //
@@ -89,7 +101,7 @@
         stop();
     }
 
-    protected abstract Mono<String> sendToClient(TopicListener.Output output);
+    protected abstract Mono<String> sendToClient(DataToConsumer output);
 
     public synchronized void stop() {
         if (this.subscription != null) {
@@ -102,17 +114,16 @@
         return this.subscription != null;
     }
 
-    private Flux<TopicListener.Output> handleReceivedMessage(Flux<TopicListener.Output> inputFlux, Job job) {
-        Flux<TopicListener.Output> result =
-                inputFlux.map(input -> new TopicListener.Output(input.key, job.filter(input.value))) //
-                        .filter(t -> !t.value.isEmpty()); //
+    private Flux<DataToConsumer> handleReceivedMessage(Flux<TopicListener.DataFromTopic> inputFlux, Job job) {
+        Flux<DataToConsumer> result = inputFlux.map(input -> new DataToConsumer(input.key, job.filter(input.value))) //
+                .filter(t -> !t.value.isEmpty()); //
 
         if (job.isBuffered()) {
             result = result.map(input -> quoteNonJson(input.value, job)) //
                     .bufferTimeout( //
                             job.getParameters().getBufferTimeout().getMaxSize(), //
                             job.getParameters().getBufferTimeout().getMaxTime()) //
-                    .map(buffered -> new TopicListener.Output("", buffered.toString()));
+                    .map(buffered -> new DataToConsumer("", buffered.toString()));
         }
         return result;
     }
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
index 69226ca..fc571d5 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
@@ -45,7 +45,7 @@
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
     private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
-    private Flux<Output> output;
+    private Flux<DataFromTopic> dataFromDmaap;
 
     public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
         AsyncRestClientFactory restclientFactory =
@@ -56,14 +56,14 @@
     }
 
     @Override
-    public Flux<Output> getOutput() {
-        if (this.output == null) {
-            this.output = createOutput();
+    public Flux<DataFromTopic> getFlux() {
+        if (this.dataFromDmaap == null) {
+            this.dataFromDmaap = startFetchFromDmaap();
         }
-        return this.output;
+        return this.dataFromDmaap;
     }
 
-    private Flux<Output> createOutput() {
+    private Flux<DataFromTopic> startFetchFromDmaap() {
         return Flux.range(0, Integer.MAX_VALUE) //
                 .flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
                 .doOnNext(input -> logger.debug("Received from DMaap: {} :{}", this.type.getDmaapTopicUrl(), input)) //
@@ -71,7 +71,7 @@
                 .doFinally(sig -> logger.error("DmaapTopicListener stopped, reason: {}", sig)) //
                 .publish() //
                 .autoConnect() //
-                .map(input -> new Output("", input)); //
+                .map(input -> new DataFromTopic("", input)); //
     }
 
     private String getDmaapUrl() {
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java
index 87a6b67..b2ade98 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/HttpDataConsumer.java
@@ -39,7 +39,7 @@
     }
 
     @Override
-    protected Mono<String> sendToClient(TopicListener.Output output) {
+    protected Mono<String> sendToClient(DataToConsumer output) {
         Job job = this.getJob();
         logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), output);
         MediaType contentType = job.isBuffered() || job.getType().isJson() ? MediaType.APPLICATION_JSON : null;
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
index 406c6f3..94a7aeb 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaDataConsumer.java
@@ -54,7 +54,7 @@
     }
 
     @Override
-    protected Mono<String> sendToClient(TopicListener.Output data) {
+    protected Mono<String> sendToClient(DataToConsumer data) {
         Job job = this.getJob();
 
         logger.debug("Sending data '{}' to Kafka topic: {}", data, this.getJob().getParameters().getKafkaOutputTopic());
@@ -67,7 +67,7 @@
     }
 
     @Override
-    public synchronized void start(Flux<TopicListener.Output> input) {
+    public synchronized void start(Flux<TopicListener.DataFromTopic> input) {
         super.start(input);
         SenderOptions<String, String> senderOptions = senderOptions(appConfig);
         this.sender = KafkaSender.create(senderOptions);
@@ -93,7 +93,7 @@
         return SenderOptions.create(props);
     }
 
-    private SenderRecord<String, String, Integer> senderRecord(TopicListener.Output output, Job infoJob) {
+    private SenderRecord<String, String, Integer> senderRecord(DataToConsumer output, Job infoJob) {
         int correlationMetadata = 2;
         String topic = infoJob.getParameters().getKafkaOutputTopic();
         return SenderRecord.create(new ProducerRecord<>(topic, output.key, output.value), correlationMetadata);
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
index 8d36fdd..11c0c28 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java
@@ -44,7 +44,7 @@
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
-    private Flux<Output> output;
+    private Flux<DataFromTopic> dataFromTopic;
 
     public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
@@ -52,14 +52,14 @@
     }
 
     @Override
-    public Flux<Output> getOutput() {
-        if (this.output == null) {
-            this.output = createOutput();
+    public Flux<DataFromTopic> getFlux() {
+        if (this.dataFromTopic == null) {
+            this.dataFromTopic = startReceiveFromTopic();
         }
-        return this.output;
+        return this.dataFromTopic;
     }
 
-    private Flux<Output> createOutput() {
+    private Flux<DataFromTopic> startReceiveFromTopic() {
         logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
         return KafkaReceiver.create(kafkaInputProperties()) //
                 .receive() //
@@ -69,7 +69,7 @@
                 .doFinally(sig -> logger.error("KafkaTopicReceiver stopped, reason: {}", sig)) //
                 .publish() //
                 .autoConnect() //
-                .map(input -> new Output(input.key(), input.value())); //
+                .map(input -> new DataFromTopic(input.key(), input.value())); //
     }
 
     private ReceiverOptions<String, String> kafkaInputProperties() {
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
index 54254a3..cb7c3de 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java
@@ -26,15 +26,15 @@
 public interface TopicListener {
 
     @ToString
-    public static class Output {
+    public static class DataFromTopic {
         public final String key;
         public final String value;
 
-        public Output(String key, String value) {
+        public DataFromTopic(String key, String value) {
             this.key = key;
             this.value = value;
         }
     }
 
-    public Flux<Output> getOutput();
+    public Flux<DataFromTopic> getFlux();
 }
diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
index 6c0f48f..4f3148d 100644
--- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
+++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
@@ -101,7 +101,7 @@
     private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
         TopicListener topicListener = topicListeners.get(job.getType().getId());
         DataConsumer consumer = createConsumer(job);
-        consumer.start(topicListener.getOutput());
+        consumer.start(topicListener.getFlux());
         consumers.put(job.getType().getId(), job.getId(), consumer);
     }
 
diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
index a3febaf..7eaf7ab 100644
--- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
+++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java
@@ -300,7 +300,7 @@
 
         // Handle received data from Kafka, check that it has been posted to the
         // consumer
-        kafkaConsumer.start(Flux.just(new TopicListener.Output("key", "data")));
+        kafkaConsumer.start(Flux.just(new TopicListener.DataFromTopic("key", "data")));
 
         ConsumerController.TestResults consumer = this.consumerController.testResults;
         await().untilAsserted(() -> assertThat(consumer.receivedBodies).hasSize(1));
diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
index 0fa7a8e..bc650f7 100644
--- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
+++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
@@ -158,7 +158,7 @@
 
     private static class KafkaReceiver {
         public final String OUTPUT_TOPIC;
-        private TopicListener.Output receivedKafkaOutput;
+        private TopicListener.DataFromTopic receivedKafkaOutput;
         private final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
 
         int count = 0;
@@ -171,13 +171,13 @@
             InfoType type = new InfoType("id", null, false, OUTPUT_TOPIC, "dataType", false);
             KafkaTopicListener topicListener = new KafkaTopicListener(applicationConfig, type);
 
-            topicListener.getOutput() //
+            topicListener.getFlux() //
                     .doOnNext(this::set) //
                     .doFinally(sig -> logger.info("Finally " + sig)) //
                     .subscribe();
         }
 
-        private void set(TopicListener.Output receivedKafkaOutput) {
+        private void set(TopicListener.DataFromTopic receivedKafkaOutput) {
             this.receivedKafkaOutput = receivedKafkaOutput;
             this.count++;
             logger.debug("*** received {}, {}", OUTPUT_TOPIC, receivedKafkaOutput);
@@ -193,7 +193,7 @@
 
         void reset() {
             count = 0;
-            this.receivedKafkaOutput = new TopicListener.Output("", "");
+            this.receivedKafkaOutput = new TopicListener.DataFromTopic("", "");
         }
     }