NONRTRIC - Implement DMaaP mediator producer service in Java
Fixed so that an information type can receive data from a Kafka stream.
This can also filter the data (using regexp matchning).
The received data can be buffered to minimize the number of REST calls to deliver the data to the consumer.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Ie3740898bd919908a7ec5753f7d6050c652cebe4
diff --git a/dmaap-adaptor-java/README.md b/dmaap-adaptor-java/README.md
index 0378bc7..9b35fe5 100644
--- a/dmaap-adaptor-java/README.md
+++ b/dmaap-adaptor-java/README.md
@@ -15,7 +15,7 @@
[
{
"id": "STD_Fault_Messages",
- "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages",
+ "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
"useHttpProxy": false
}
]
diff --git a/dmaap-adaptor-java/config/application.yaml b/dmaap-adaptor-java/config/application.yaml
index 5733ea7..6a2d68a 100644
--- a/dmaap-adaptor-java/config/application.yaml
+++ b/dmaap-adaptor-java/config/application.yaml
@@ -51,6 +51,9 @@
# configuration from the Consul will override the file.
configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
dmaap-base-url: http://dradmin:dradmin@localhost:2222
- # The url used to adress this component. This is used as a callback url sent to other components.
+ # The url used to adress this component. This is used as a callback url sent to other components.
dmaap-adapter-base-url: https://localhost:8435
+ # KAFKA boostrap server. This is only needed if there are Information Types that uses a kafkaInputTopic
+ kafka:
+ bootstrap-servers: localhost:9092
diff --git a/dmaap-adaptor-java/pom.xml b/dmaap-adaptor-java/pom.xml
index 1fbd83c..411b27c 100644
--- a/dmaap-adaptor-java/pom.xml
+++ b/dmaap-adaptor-java/pom.xml
@@ -205,6 +205,16 @@
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ <version>1.3.7</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>31.0.1-jre</version>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java
index c9ba93f..faf5742 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/BeanFactory.java
@@ -27,7 +27,8 @@
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Jobs;
-import org.oran.dmaapadapter.tasks.DmaapMessageConsumer;
+import org.oran.dmaapadapter.tasks.DmaapTopicConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
@@ -37,6 +38,7 @@
@Configuration
public class BeanFactory {
+ private InfoTypes infoTypes;
@Value("${server.http-port}")
private int httpPort = 0;
@@ -47,16 +49,24 @@
}
@Bean
- public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs) {
+ public InfoTypes types(@Autowired ApplicationConfig appConfig, @Autowired Jobs jobs,
+ @Autowired KafkaTopicConsumers kafkaConsumers) {
+ if (infoTypes != null) {
+ return infoTypes;
+ }
+
Collection<InfoType> types = appConfig.getTypes();
// Start a consumer for each type
for (InfoType type : types) {
- DmaapMessageConsumer topicConsumer = new DmaapMessageConsumer(appConfig, type, jobs);
- topicConsumer.start();
+ if (type.isDmaapTopicDefined()) {
+ DmaapTopicConsumer topicConsumer = new DmaapTopicConsumer(appConfig, type, jobs);
+ topicConsumer.start();
+ }
}
-
- return new InfoTypes(types);
+ infoTypes = new InfoTypes(types);
+ kafkaConsumers.start(infoTypes);
+ return infoTypes;
}
@Bean
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
index e26fd46..f17a9c0 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/configuration/ApplicationConfig.java
@@ -88,6 +88,10 @@
@Value("${app.dmaap-base-url}")
private String dmaapBaseUrl;
+ @Getter
+ @Value("${app.kafka.bootstrap-servers:}")
+ private String kafkaBootStrapServers;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
index ca7c96c..e4dca5b 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
@@ -85,7 +85,7 @@
logger.info("Job started callback {}", request.id);
Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner,
- request.lastUpdated);
+ request.lastUpdated, toJobParameters(request.jobData));
this.jobs.put(job);
return new ResponseEntity<>(HttpStatus.OK);
} catch (Exception e) {
@@ -93,6 +93,11 @@
}
}
+ private Job.Parameters toJobParameters(Object jobData) {
+ String json = gson.toJson(jobData);
+ return gson.fromJson(json, Job.Parameters.class);
+ }
+
@GetMapping(path = JOB_URL, produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(summary = "Get all jobs", description = "Returns all info jobs, can be used for trouble shooting")
@ApiResponse(responseCode = "200", //
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
index 9dda1e6..27b527d 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoType.java
@@ -22,6 +22,8 @@
import lombok.Getter;
+import org.springframework.util.StringUtils;
+
public class InfoType {
@Getter
@@ -33,10 +35,22 @@
@Getter
private final boolean useHttpProxy;
- public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy) {
+ @Getter
+ private final String kafkaInputTopic;
+
+ public InfoType(String id, String dmaapTopicUrl, boolean useHttpProxy, String kafkaInputTopic) {
this.id = id;
this.dmaapTopicUrl = dmaapTopicUrl;
this.useHttpProxy = useHttpProxy;
+ this.kafkaInputTopic = kafkaInputTopic;
+ }
+
+ public boolean isKafkaTopicDefined() {
+ return StringUtils.hasLength(kafkaInputTopic);
+ }
+
+ public boolean isDmaapTopicDefined() {
+ return StringUtils.hasLength(dmaapTopicUrl);
}
}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
index b8677a3..558fc46 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/InfoTypes.java
@@ -35,7 +35,6 @@
private Map<String, InfoType> allTypes = new HashMap<>();
public InfoTypes(Collection<InfoType> types) {
-
for (InfoType type : types) {
put(type);
}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
index 0da94a6..d1697e9 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
@@ -20,10 +20,42 @@
package org.oran.dmaapadapter.repository;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
import lombok.Getter;
+import org.immutables.gson.Gson;
+
public class Job {
+ @Gson.TypeAdapters
+ public static class Parameters {
+ public String filter;
+ public BufferTimeout bufferTimeout;
+
+ public Parameters() {
+ }
+
+ public Parameters(String filter, BufferTimeout bufferTimeout) {
+ this.filter = filter;
+ this.bufferTimeout = bufferTimeout;
+ }
+
+ public static class BufferTimeout {
+ public BufferTimeout(int maxSize, int maxTimeMiliseconds) {
+ this.maxSize = maxSize;
+ this.maxTimeMiliseconds = maxTimeMiliseconds;
+ }
+
+ public BufferTimeout() {
+ }
+
+ public int maxSize;
+ public int maxTimeMiliseconds;
+ }
+ }
+
@Getter
private final String id;
@@ -37,14 +69,38 @@
private final String owner;
@Getter
+ private final Parameters parameters;
+
+ @Getter
private final String lastUpdated;
- public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated) {
+ private final Pattern jobDataFilter;
+
+ public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) {
this.id = id;
this.callbackUrl = callbackUrl;
this.type = type;
this.owner = owner;
this.lastUpdated = lastUpdated;
+ this.parameters = parameters;
+ if (parameters != null && parameters.filter != null) {
+ jobDataFilter = Pattern.compile(parameters.filter);
+ } else {
+ jobDataFilter = null;
+ }
+ }
+
+ public boolean isFilterMatch(String data) {
+ if (jobDataFilter == null) {
+ return true;
+ }
+ Matcher matcher = jobDataFilter.matcher(data);
+ return matcher.find();
+ }
+
+ public boolean isBuffered() {
+ return parameters != null && parameters.bufferTimeout != null && parameters.bufferTimeout.maxSize > 0
+ && parameters.bufferTimeout.maxTimeMiliseconds > 0;
}
}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java
index 6e2b326..8a38824 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java
@@ -26,8 +26,10 @@
import java.util.Vector;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@@ -36,8 +38,11 @@
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
+ private final KafkaTopicConsumers kafkaConsumers;
- public Jobs() {}
+ public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+ this.kafkaConsumers = kafkaConsumers;
+ }
public synchronized Job getJob(String id) throws ServiceException {
Job job = allJobs.get(id);
@@ -52,9 +57,10 @@
}
public synchronized void put(Job job) {
- logger.debug("Put service: {}", job.getId());
+ logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);
+ kafkaConsumers.addJob(job);
}
public synchronized Iterable<Job> getAll() {
@@ -72,6 +78,7 @@
public synchronized void remove(Job job) {
this.allJobs.remove(job.getId());
jobsByType.remove(job.getType().getId(), job.getId());
+ kafkaConsumers.removeJob(job);
}
public synchronized int size() {
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
similarity index 85%
rename from dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java
rename to dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
index b7c4ec6..7d55758 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapMessageConsumer.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
@@ -39,15 +39,16 @@
* consumers that has a job for this InformationType.
*/
-public class DmaapMessageConsumer {
+public class DmaapTopicConsumer {
private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
- private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
- private final ApplicationConfig applicationConfig;
+ private static final Logger logger = LoggerFactory.getLogger(DmaapTopicConsumer.class);
+
private final AsyncRestClient dmaapRestClient;
- private final AsyncRestClient consumerRestClient;
- private final InfoType type;
- private final Jobs jobs;
private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
+ private final AsyncRestClient consumerRestClient;
+ protected final ApplicationConfig applicationConfig;
+ protected final InfoType type;
+ protected final Jobs jobs;
/** Submits new elements until stopped */
private static class InfiniteFlux {
@@ -80,10 +81,10 @@
}
}
- public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
- this.applicationConfig = applicationConfig;
+ public DmaapTopicConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
+ this.applicationConfig = applicationConfig;
this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
: restclientFactory.createRestClientNoHttpProxy("");
this.type = type;
@@ -93,31 +94,24 @@
public void start() {
infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
- .flatMap(this::handleReceivedMessage, 5) //
+ .flatMap(this::pushDataToConsumers) //
.subscribe(//
- value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+ null, //
throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
- );
+ () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId())); //
+
}
private String getDmaapUrl() {
-
return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
}
private Mono<String> handleDmaapErrorResponse(Throwable t) {
logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
- return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
- .flatMap(notUsed -> Mono.empty());
+ return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(notUsed -> Mono.empty());
}
- private Mono<String> handleConsumerErrorResponse(Throwable t) {
- logger.warn("error from CONSUMER {}", t.getMessage());
- return Mono.empty();
- }
-
- protected Mono<String> getFromMessageRouter(String topicUrl) {
+ private Mono<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
@@ -125,9 +119,14 @@
.onErrorResume(this::handleDmaapErrorResponse); //
}
- protected Flux<String> handleReceivedMessage(String body) {
- logger.debug("Received from DMAAP {}", body);
- final int CONCURRENCY = 5;
+ private Mono<String> handleConsumerErrorResponse(Throwable t) {
+ logger.warn("error from CONSUMER {}", t.getMessage());
+ return Mono.empty();
+ }
+
+ protected Flux<String> pushDataToConsumers(String body) {
+ logger.debug("Received data {}", body);
+ final int CONCURRENCY = 50;
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
@@ -135,5 +134,4 @@
.flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
-
}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java
new file mode 100644
index 0000000..6079edf
--- /dev/null
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java
@@ -0,0 +1,130 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Many;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+
+/**
+ * The class fetches incoming requests from DMAAP and sends them further to the
+ * consumers that has a job for this InformationType.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class KafkaTopicConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
+ private final AsyncRestClient consumerRestClient;
+ private final ApplicationConfig applicationConfig;
+ private final InfoType type;
+ private final Many<String> consumerDistributor;
+
+ public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
+ this.applicationConfig = applicationConfig;
+
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
+ this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+
+ AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+ this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+ : restclientFactory.createRestClientNoHttpProxy("");
+ this.type = type;
+ startKafkaTopicReceiver();
+ }
+
+ private Disposable startKafkaTopicReceiver() {
+ return KafkaReceiver.create(kafkaInputProperties()) //
+ .receive() //
+ .flatMap(this::onReceivedData) //
+ .subscribe(null, //
+ throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
+ () -> logger.warn("KafkaMessageConsumer stopped"));
+ }
+
+ private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
+ logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
+ consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
+ return consumerDistributor.asFlux();
+ }
+
+ public Disposable startDistributeToConsumer(Job job) {
+ return getMessagesFromKafka(job) //
+ .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
+ .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
+ .onErrorResume(this::handleConsumerErrorResponse) //
+ .subscribe(null, //
+ throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
+ () -> logger.warn("KafkaMessageConsumer stopped {}", job.getType().getId()));
+ }
+
+ private Flux<String> getMessagesFromKafka(Job job) {
+ if (job.isBuffered()) {
+ return consumerDistributor.asFlux() //
+ .filter(job::isFilterMatch) //
+ .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
+ Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
+ .flatMap(o -> Flux.just(o.toString()));
+ } else {
+ return consumerDistributor.asFlux() //
+ .filter(job::isFilterMatch);
+ }
+ }
+
+ private Mono<String> handleConsumerErrorResponse(Throwable t) {
+ logger.warn("error from CONSUMER {}", t.getMessage());
+ return Mono.empty();
+ }
+
+ private ReceiverOptions<Integer, String> kafkaInputProperties() {
+ Map<String, Object> consumerProps = new HashMap<>();
+ if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
+ logger.error("No kafka boostrap server is setup");
+ }
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return ReceiverOptions.<Integer, String>create(consumerProps)
+ .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
+ }
+
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
new file mode 100644
index 0000000..23d9da2
--- /dev/null
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
@@ -0,0 +1,79 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+
+/**
+ * The class fetches incoming requests from DMAAP and sends them further to the
+ * consumers that has a job for this InformationType.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
+public class KafkaTopicConsumers {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
+
+ private final Map<String, KafkaTopicConsumer> topicConsumers = new HashMap<>();
+ private final Map<String, Disposable> activeSubscriptions = new HashMap<>();
+ private final ApplicationConfig appConfig;
+
+ public KafkaTopicConsumers(@Autowired ApplicationConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ public void start(InfoTypes types) {
+ for (InfoType type : types.getAll()) {
+ if (type.isKafkaTopicDefined()) {
+ KafkaTopicConsumer topicConsumer = new KafkaTopicConsumer(appConfig, type);
+ topicConsumers.put(type.getId(), topicConsumer);
+ }
+ }
+ }
+
+ public synchronized void addJob(Job job) {
+ if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+ logger.debug("Kafka job added {}", job.getId());
+ KafkaTopicConsumer topicConsumer = topicConsumers.get(job.getType().getId());
+ Disposable subscription = topicConsumer.startDistributeToConsumer(job);
+ activeSubscriptions.put(job.getId(), subscription);
+ }
+ }
+
+ public synchronized void removeJob(Job job) {
+ Disposable d = activeSubscriptions.remove(job.getId());
+ if (d != null) {
+ logger.debug("Kafka job removed {}", job.getId());
+ d.dispose();
+ }
+ }
+
+}
diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
index 4a68ab0..e8b236c 100644
--- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
+++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
@@ -20,14 +20,21 @@
package org.oran.dmaapadapter.tasks;
+import com.google.common.io.CharStreams;
import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
import lombok.Getter;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
import org.oran.dmaapadapter.repository.InfoType;
@@ -111,12 +118,12 @@
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
- final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
+ + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
- .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+ .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
CONCURRENCY) //
.collectList() //
.doOnNext(type -> logger.info("Registering producer")) //
@@ -127,18 +134,39 @@
return jsonObject("{}");
}
- private ProducerInfoTypeInfo typeRegistrationInfo() {
- return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+ private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+ try {
+ return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+ } catch (Exception e) {
+ logger.error("Fatal error {}", e.getMessage());
+ return null;
+ }
}
- private Object jsonSchemaObject() {
- // An object with no properties
- String schemaStr = "{" //
- + "\"type\": \"object\"," //
- + "\"properties\": {}," //
- + "\"additionalProperties\": false" //
- + "}"; //
- return jsonObject(schemaStr);
+ private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+
+ if (type.isKafkaTopicDefined()) {
+ String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
+ return jsonObject(schemaStrKafka);
+ } else {
+ // An object with no properties
+ String schemaStr = "{" //
+ + "\"type\": \"object\"," //
+ + "\"properties\": {}," //
+ + "\"additionalProperties\": false" //
+ + "}"; //
+
+ return jsonObject(schemaStr);
+ }
+ }
+
+ private String readSchemaFile(String filePath) throws IOException, ServiceException {
+ InputStream in = getClass().getResourceAsStream(filePath);
+ logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+ if (in == null) {
+ throw new ServiceException("Could not readfile: " + filePath);
+ }
+ return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
}
private Object jsonObject(String json) {
diff --git a/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json
new file mode 100644
index 0000000..0ff7c80
--- /dev/null
+++ b/dmaap-adaptor-java/src/main/resources/typeSchemaKafka.json
@@ -0,0 +1,26 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ },
+ "bufferTimeout": {
+ "type": "object",
+ "properties": {
+ "maxSize": {
+ "type": "integer"
+ },
+ "maxTimeMiliseconds": {
+ "type": "integer"
+ }
+ },
+ "required": [
+ "maxSize",
+ "maxTimeMiliseconds"
+ ]
+ }
+ },
+ "required": [
+ ]
+}
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
index 828b027..8d1dda6 100644
--- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/EcsSimulatorController.java
@@ -79,7 +79,6 @@
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
-
}
@PutMapping(path = API_ROOT + "/info-producers/{infoProducerId}", //
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java
index 1cceef0..376d23e 100644
--- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithEcs.java
@@ -52,6 +52,7 @@
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
+@SuppressWarnings("java:S3577") // Rename class
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@TestPropertySource(properties = { //
diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
new file mode 100644
index 0000000..31ef970
--- /dev/null
+++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
@@ -0,0 +1,257 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2021 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import com.google.gson.JsonParser;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
+import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
+import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig;
+import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.dmaapadapter.r1.ConsumerJobInfo;
+import org.oran.dmaapadapter.repository.InfoType;
+import org.oran.dmaapadapter.repository.InfoTypes;
+import org.oran.dmaapadapter.repository.Job;
+import org.oran.dmaapadapter.repository.Jobs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import reactor.core.publisher.Flux;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+@SuppressWarnings("java:S3577") // Rename class
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
+@TestPropertySource(properties = { //
+ "server.ssl.key-store=./config/keystore.jks", //
+ "app.webclient.trust-store=./config/truststore.jks", //
+ "app.configuration-filepath=./src/test/resources/test_application_configuration_kafka.json"//
+})
+class IntegrationWithKafka {
+
+ @Autowired
+ private ApplicationConfig applicationConfig;
+
+ @Autowired
+ private Jobs jobs;
+
+ @Autowired
+ private InfoTypes types;
+
+ @Autowired
+ private ConsumerController consumerController;
+
+ @Autowired
+ private EcsSimulatorController ecsSimulatorController;
+
+ private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
+
+ private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
+
+ @LocalServerPort
+ int localServerHttpPort;
+
+ static class TestApplicationConfig extends ApplicationConfig {
+ @Override
+ public String getEcsBaseUrl() {
+ return thisProcessUrl();
+ }
+
+ @Override
+ public String getDmaapBaseUrl() {
+ return thisProcessUrl();
+ }
+
+ @Override
+ public String getSelfUrl() {
+ return thisProcessUrl();
+ }
+
+ private String thisProcessUrl() {
+ final String url = "https://localhost:" + getLocalServerHttpPort();
+ return url;
+ }
+ }
+
+ /**
+ * Overrides the BeanFactory.
+ */
+ @TestConfiguration
+ static class TestBeanFactory extends BeanFactory {
+
+ @Override
+ @Bean
+ public ServletWebServerFactory servletContainer() {
+ return new TomcatServletWebServerFactory();
+ }
+
+ @Override
+ @Bean
+ public ApplicationConfig getApplicationConfig() {
+ TestApplicationConfig cfg = new TestApplicationConfig();
+ return cfg;
+ }
+ }
+
+ @AfterEach
+ void reset() {
+ this.consumerController.testResults.reset();
+ this.ecsSimulatorController.testResults.reset();
+ this.jobs.clear();
+ }
+
+ private AsyncRestClient restClient(boolean useTrustValidation) {
+ WebClientConfig config = this.applicationConfig.getWebClientConfig();
+ HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
+ .httpProxyHost("") //
+ .httpProxyPort(0) //
+ .build();
+ config = ImmutableWebClientConfig.builder() //
+ .keyStoreType(config.keyStoreType()) //
+ .keyStorePassword(config.keyStorePassword()) //
+ .keyStore(config.keyStore()) //
+ .keyPassword(config.keyPassword()) //
+ .isTrustStoreUsed(useTrustValidation) //
+ .trustStore(config.trustStore()) //
+ .trustStorePassword(config.trustStorePassword()) //
+ .httpProxyConfig(httpProxyConfig).build();
+
+ AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+ return restClientFactory.createRestClientNoHttpProxy(baseUrl());
+ }
+
+ private AsyncRestClient restClient() {
+ return restClient(false);
+ }
+
+ private String baseUrl() {
+ return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
+ }
+
+ private Object jobParametersAsJsonObject(String filter, int maxTimeMiliseconds, int maxSize) {
+ Job.Parameters param = new Job.Parameters(filter,
+ new Job.Parameters.BufferTimeout(maxSize, maxTimeMiliseconds));
+ String str = gson.toJson(param);
+ return jsonObject(str);
+ }
+
+ private Object jsonObject(String json) {
+ try {
+ return JsonParser.parseString(json).getAsJsonObject();
+ } catch (Exception e) {
+ throw new NullPointerException(e.toString());
+ }
+ }
+
+ private ConsumerJobInfo consumerJobInfo(String filter, int maxTimeMiliseconds, int maxSize) {
+ try {
+ InfoType type = this.types.getAll().iterator().next();
+ String typeId = type.getId();
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ return new ConsumerJobInfo(typeId, jobParametersAsJsonObject(filter, maxTimeMiliseconds, maxSize), "owner",
+ targetUri, "");
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private SenderOptions<Integer, String> senderOptions() {
+ String bootstrapServers = this.applicationConfig.getKafkaBootStrapServers();
+
+ Map<String, Object> props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+ props.put(ProducerConfig.ACKS_CONFIG, "all");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return SenderOptions.create(props);
+ }
+
+ private SenderRecord<Integer, String, Integer> senderRecord(String data, int i) {
+ final InfoType infoType = this.types.getAll().iterator().next();
+ return SenderRecord.create(new ProducerRecord<>(infoType.getKafkaInputTopic(), i, data + i), i);
+ }
+
+ @Test
+ void kafkaIntegrationTest() throws InterruptedException {
+ final String JOB_ID1 = "ID1";
+ final String JOB_ID2 = "ID2";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
+
+ // Create a job
+ this.ecsSimulatorController.addJob(consumerJobInfo(".*", 10, 1000), JOB_ID1, restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo(".*Message_1.*", 0, 0), JOB_ID2, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+
+ final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
+
+ var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
+
+ sender.send(dataToSend) //
+ .doOnError(e -> logger.error("Send failed", e)) //
+ .doOnNext(senderResult -> logger.debug("Sent {}", senderResult)) //
+ .doOnError(t -> logger.error("Error {}", t)) //
+ .blockLast();
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(2));
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("Message_1");
+ assertThat(consumer.receivedBodies.get(1)).isEqualTo("[Message_1, Message_2, Message_3]");
+
+ // Delete the job
+ this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+}
diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json
index 8d211b8..794eb8e 100644
--- a/dmaap-adaptor-java/src/test/resources/test_application_configuration.json
+++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration.json
@@ -3,7 +3,7 @@
{
"id": "ExampleInformationType",
"dmaapTopicUrl": "/dmaap-topic-1",
- "useHttpProxy": true
+ "useHttpProxy": false
}
]
}
\ No newline at end of file
diff --git a/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json
new file mode 100644
index 0000000..e2ea525
--- /dev/null
+++ b/dmaap-adaptor-java/src/test/resources/test_application_configuration_kafka.json
@@ -0,0 +1,9 @@
+{
+ "types": [
+ {
+ "id": "ExampleInformationType",
+ "kafkaInputTopic": "TutorialTopic",
+ "useHttpProxy": false
+ }
+ ]
+}
\ No newline at end of file