ICS sample producer and consumer
Sample Java producer and consumer that integrates with ICS callbacks.
Issue-ID: NONRTRIC-965
Change-Id: I7319b46802444af130a3bd0d5c6bdd12f97c9904
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
diff --git a/sample-services/ics-producer-consumer/.gitignore b/sample-services/ics-producer-consumer/.gitignore
new file mode 100644
index 0000000..851f236
--- /dev/null
+++ b/sample-services/ics-producer-consumer/.gitignore
@@ -0,0 +1,26 @@
+.idea
+.DS_Store
+target
+logging.log
+
+*.class
+logs/
+.vscode
+*.log
+
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
diff --git a/sample-services/ics-producer-consumer/README.md b/sample-services/ics-producer-consumer/README.md
new file mode 100644
index 0000000..1731c17
--- /dev/null
+++ b/sample-services/ics-producer-consumer/README.md
@@ -0,0 +1,102 @@
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+# Automatic
+### Using Kafka with a Java Producer and Consumer
+
+Run the demo script
+It will check prerequisites, build a consumer and producer, and run them with kafka and ICS.
+
+```shell
+./start.sh
+```
+Or run the other script to bring up RedPanda on port 8888 and NONRTRIC control panel UI on port 8181
+
+```shell
+./red.sh
+```
+
+For a faster execution you can add:
+
+--skip-build to skip creating the app jar and building the docker images
+
+--no-console to skip running RedPanda and NONRTRIC control panel
+
+```shell
+./red.sh --skip-build --no-console
+```
+# Manual
+### Run Kafka in a container
+
+```shell
+docker-compose up -d kafka-zkless
+```
+
+### Starting the REST application individually
+
+In a new terminal window:
+
+```shell
+mvn spring-boot:run
+```
+
+### Starting a producer
+
+```shell
+sh ./runproducer.sh
+```
+
+### Starting a Consumer
+
+```shell
+sh ./runproducer.sh
+```
+
+## Reading the logs
+
+A sample of the output is as follows:
+
+```
+Demo Producer Docker logs
+
+2024-04-02 12:48:05 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"ygHwxXSIxW","key":"f8f1a7a7-a78e-4c7d-9b8d-108bb0cc9e2c"}
+2024-04-02 12:48:06 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"KNIbP10zfN","key":"b058d00f-bbcd-4d2c-936b-6327847d4c2a"}
+2024-04-02 12:48:07 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"V6fH1NkdeH","key":"ae1a83a3-d8a7-40c8-9d98-529230f8b585"}
+2024-04-02 12:48:08 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"m76qvRFh6f","key":"abccde52-fa72-4fd4-99ab-5bc21514d825"}
+2024-04-02 12:48:09 INFO c.d.p.p.SimpleProducer:141 - {"bootstrapServers":"kafka-zkless:9092","topic":"mytopic","source":"com.demo.producer.producer.SimpleProducer","message":"t7FJYnFr43","key":"0602239e-34e9-45a6-a04a-3c67b4c7d9e4"}
+
+++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+Demo Consumer Docker logs
+
+2024-04-02 12:48:05 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: ygHwxXSIxW"}
+2024-04-02 12:48:06 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: KNIbP10zfN"}
+2024-04-02 12:48:07 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: V6fH1NkdeH"}
+2024-04-02 12:48:08 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: m76qvRFh6f"}
+2024-04-02 12:48:09 INFO c.d.c.c.SimpleConsumer:158 - {"message":"Topic: mytopicMessage: t7FJYnFr43"}
+
+++++++++++++++++++++++++++++++++++++++++++++++++++++
+
+ICS logs
+
+2024-04-02T12:48:05.615Z DEBUG 1 --- [or-http-epoll-2] o.o.i.c.r1producer.ProducerCallbacks : Job subscription 1 started OK 1
+2024-04-02T12:48:05.820Z DEBUG 1 --- [io-8083-exec-10] o.o.i.repository.InfoTypeSubscriptions : Added type status subscription 1
+```
+
+The script will fail (exit 1) if there are anny ERRORS logged in the kafka-producer and kafka-consumer.
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/application.yaml b/sample-services/ics-producer-consumer/application.yaml
new file mode 100644
index 0000000..cfb4f78
--- /dev/null
+++ b/sample-services/ics-producer-consumer/application.yaml
@@ -0,0 +1,83 @@
+# ============LICENSE_START===============================================
+# Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+spring:
+ profiles:
+ active: prod
+ main:
+ allow-bean-definition-overriding: true
+ aop:
+ auto: false
+springdoc:
+ show-actuator: true
+management:
+ endpoints:
+ web:
+ exposure:
+ # Enabling of springboot actuator features. See springboot documentation.
+ include: "loggers,logfile,health,info,metrics,threaddump,heapdump,shutdown"
+ endpoint:
+ shutdown:
+ enabled: true
+lifecycle:
+ timeout-per-shutdown-phase: "20s"
+logging:
+ # Configuration of logging
+ level:
+ ROOT: INFO
+ org.springframework: INFO
+ org.springframework.data: INFO
+ org.springframework.web.reactive.function.client.ExchangeFunctions: INFO
+ org.oransc.ics: DEBUG
+ file:
+ name: /var/log/information-coordinator-service/application.log
+server:
+ # Configuration of the HTTP/REST server. The parameters are defined and handeled by the springboot framework.
+ # See springboot documentation.
+ port : 8434
+ http-port: 8083
+ ssl:
+ key-store-type: JKS
+ key-store-password: policy_agent
+ key-store: /opt/app/information-coordinator-service/etc/cert/keystore.jks
+ key-password: policy_agent
+ key-alias: policy_agent
+ shutdown: "graceful"
+app:
+ webclient:
+ # Configuration of the trust store used for the HTTP client (outgoing requests)
+ # The file location and the password for the truststore is only relevant if trust-store-used == true
+ # Note that the same keystore as for the server is used.
+ trust-store-used: false
+ trust-store-password: policy_agent
+ trust-store: /opt/app/information-coordinator-service/etc/cert/truststore.jks
+ # Configuration of usage of HTTP Proxy for the southbound accesses.
+ # The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
+ http.proxy-host:
+ http.proxy-port: 0
+ vardata-directory: /var/information-coordinator-service
+ # If the file name is empty, no authorization token is used
+ auth-token-file:
+ # A URL to authorization provider such as OPA. Each time a information job is accessed, a call to this
+ # authorization provider is done for access control. If this is empty, no fine grained access control is done.
+ info-job-authorization-agent:
+ # S3 object store usage is enabled by defining the bucket to use. This will override the vardata-directory parameter.
+ s3:
+ endpointOverride: http://localhost:9000
+ accessKeyId: minio
+ secretAccessKey: miniostorage
+ bucket:
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/consumer/Dockerfile b/sample-services/ics-producer-consumer/consumer/Dockerfile
new file mode 100644
index 0000000..d5d6f2a
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/Dockerfile
@@ -0,0 +1,49 @@
+#==================================================================================
+# Copyright (C) 2024: OpenInfra Foundation Europe
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#==================================================================================
+
+# Use Maven image with OpenJDK 17 for the build stage
+FROM maven:3.8.5-openjdk-17 AS maven_build
+
+# Copy Maven project files
+COPY pom.xml /tmp/
+COPY src /tmp/src/
+
+# Set working directory
+WORKDIR /tmp/
+
+# Build the Maven project
+RUN mvn package
+
+# Use a separate image with OpenJDK 17 for the runtime stage
+FROM openjdk:17-jdk-slim
+
+# Expose port 8081
+EXPOSE 8081
+
+ARG KAFKA_SERVERS
+ENV KAFKA_SERVERS=${KAFKA_SERVERS}
+
+# Set the working directory
+WORKDIR /app
+
+# Copy the JAR file from the maven_build stage to the runtime stage
+COPY --from=maven_build /tmp/target/consumer-0.0.1.jar /app/consumer-0.0.1.jar
+
+# Command to run the application
+CMD ["java", "-jar", "consumer-0.0.1.jar"]
diff --git a/sample-services/ics-producer-consumer/consumer/Makefile b/sample-services/ics-producer-consumer/consumer/Makefile
new file mode 100644
index 0000000..4e87cd3
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/Makefile
@@ -0,0 +1,51 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+# Define variables
+IMAGE_NAME = "o-ran-sc/nonrtric-sample-icsconsumer"
+DOCKERFILE = Dockerfile
+
+# Default target
+.PHONY: all
+all: build run
+
+# Target to build the Maven JAR
+.PHONY: jar
+jar:
+ mvn clean package
+
+# Target to build the Docker image
+.PHONY: build
+build:
+ docker build -t $(IMAGE_NAME) -f $(DOCKERFILE) .
+
+# Target to run the Docker container
+.PHONY: run
+run:
+ docker run -p 8081:8081 $(IMAGE_NAME)
+
+# Target to stop and remove the Docker container
+.PHONY: stop
+stop:
+ docker stop $(IMAGE_NAME) || true
+ docker rm $(IMAGE_NAME) || true
+
+# Target to clean up
+.PHONY: clean
+clean: stop
+ docker rmi $(IMAGE_NAME) || true
diff --git a/sample-services/ics-producer-consumer/consumer/container-tag.yaml b/sample-services/ics-producer-consumer/consumer/container-tag.yaml
new file mode 100644
index 0000000..48290af
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/container-tag.yaml
@@ -0,0 +1,23 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+# The Jenkins job requires a tag to build the Docker image.
+# By default this file is in the docker build directory,
+# but the location can configured in the JJB template.
+---
+tag: 0.0.1
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/consumer/pom.xml b/sample-services/ics-producer-consumer/consumer/pom.xml
new file mode 100644
index 0000000..84a6ae8
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.2.3</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>com.demo.consumer</groupId>
+ <artifactId>consumer</artifactId>
+ <version>0.0.1</version>
+ <packaging>jar</packaging>
+
+ <properties>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.7.0</version>
+ <!--<scope>provided</scope> -->
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.24</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.10.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <mainClass>com.demo.consumer.Application</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java
new file mode 100644
index 0000000..0ba278b
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/Application.java
@@ -0,0 +1,57 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+import com.demo.consumer.messages.KafkaMessageHandlerImpl;
+
+@SpringBootApplication
+@EnableAsync
+public class Application {
+
+ @Autowired
+ private SimpleConsumer simpleConsumer;
+
+ @Value("${vars.autostart:false}")
+ private boolean autostart;
+
+ @Value("${vars.topic}")
+ private String topic;
+
+ public static void main(String[] args) throws Exception {
+ SpringApplication.run(Application.class, args);
+ }
+
+ @EventListener(ApplicationReadyEvent.class)
+ public void checkAutoRun() throws Exception{
+ if (autostart) {
+ simpleConsumer.runAlways(topic, new KafkaMessageHandlerImpl());
+ }
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java
new file mode 100644
index 0000000..430ab59
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/consumer/SimpleConsumer.java
@@ -0,0 +1,144 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import com.demo.consumer.messages.AbstractSimpleKafka;
+import com.demo.consumer.messages.KafkaMessageHandler;
+import com.demo.consumer.messages.MessageHelper;
+import com.demo.consumer.messages.PropertiesHelper;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Properties;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Component
+@Setter
+@Getter
+public class SimpleConsumer extends AbstractSimpleKafka {
+ private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);
+
+ private KafkaConsumer<String, String> kafkaConsumer = null;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ @Value("${vars.time:2000}")
+ private int TIME_OUT_MS;
+
+ public void run(String topicName, int numberOfRecords, KafkaMessageHandler callback) throws Exception {
+ Properties props = PropertiesHelper.getProperties();
+ // See if the number of records is provided
+ Optional<Integer> recs = Optional.ofNullable(numberOfRecords);
+
+ // adjust the number of records to get accordingly
+ Integer numOfRecs = recs.orElseGet(() -> Integer.parseInt(props.getProperty("max.poll.records")));
+ props.setProperty("max.poll.records", String.valueOf(numOfRecs));
+
+ // create the consumer
+ KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
+
+ // make the consumer available for graceful shutdown
+ setKafkaConsumer(consumer);
+ consumer.assign(Collections.singleton(new TopicPartition(topicName, 0)));
+ //consumer.seekToBeginning(consumer.assignment()); //--from-beginning
+ int recNum = numOfRecs;
+ while (recNum > 0) {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(TIME_OUT_MS));
+ recNum = records.count();
+ if (recNum == 0) {
+ log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
+ break;
+ }
+
+ for (ConsumerRecord<String, String> record : records) {
+ callback.processMessage(topicName, record);
+ recNum--;
+ }
+ }
+ consumer.close();
+ }
+
+ private void close() throws Exception {
+ if (this.getKafkaConsumer() == null) {
+ log.info(MessageHelper.getSimpleJSONObject("The internal consumer is NULL").toJSONString());
+ return;
+ }
+ log.info(MessageHelper.getSimpleJSONObject("Closing consumer").toJSONString());
+ this.getKafkaConsumer().close();
+ }
+
+ public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
+ Properties props = PropertiesHelper.getProperties();
+ // make the consumer available for graceful shutdown
+ setKafkaConsumer(new KafkaConsumer<>(props));
+
+ // keep running forever or until shutdown() is called from another thread.
+ try {
+ getKafkaConsumer().subscribe(List.of(topicName));
+ while (!closed.get()) {
+ ConsumerRecords<String, String> records = getKafkaConsumer().poll(Duration.ofMillis(TIME_OUT_MS));
+ if (records.count() == 0) {
+ log.info(MessageHelper.getSimpleJSONObject("No records retrieved").toJSONString());
+ }
+
+ for (ConsumerRecord<String, String> record : records) {
+ callback.processMessage(topicName, record);
+ log.info(MessageHelper.getSimpleJSONObject("Topic: " + topicName + "Message: " + record.value()).toJSONString());
+ }
+ }
+ } catch (WakeupException e) {
+ // Ignore exception if closing
+ if (!closed.get())
+ throw e;
+ }
+ }
+
+ public void shutdown() {
+ closed.set(true);
+ try {
+ log.info(MessageHelper.getSimpleJSONObject("Shutting down consumer").toJSONString());
+ } catch (Exception e) {
+ }
+ getKafkaConsumer().wakeup();
+ }
+
+ public KafkaConsumer<String, String> getKafkaConsumer() {
+ return kafkaConsumer;
+ }
+
+ public void setKafkaConsumer(KafkaConsumer<String, String> kafkaConsumer) {
+ this.kafkaConsumer = kafkaConsumer;
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java
new file mode 100644
index 0000000..63cc215
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ConsumerController.java
@@ -0,0 +1,85 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.controllers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.demo.consumer.repository.InfoType;
+import com.demo.consumer.repository.InfoTypes;
+import com.demo.consumer.repository.Job.Parameters;
+import com.demo.consumer.dme.ConsumerJobInfo;
+import com.demo.consumer.dme.ConsumerStatusInfo;
+import com.demo.consumer.repository.Jobs;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@RestController
+@RequestMapping(path = "/consumer", produces = "application/json")
+public class ConsumerController {
+ private static final Logger log = LoggerFactory.getLogger(ConsumerController.class);
+
+ private static Gson gson = new GsonBuilder().create();
+
+ private final Jobs jobs;
+ private final InfoTypes types;
+
+ public ConsumerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
+ this.jobs = jobs;
+ this.types = types;
+ InfoType type1 = InfoType.builder().build();
+ Parameters p = Parameters.builder().build();
+ type1.setId("type1");
+ type1.setKafkaInputTopic("mytopic");
+ type1.setInputJobType("type1");
+ type1.setInputJobDefinition(p);
+ types.put(type1);
+ }
+
+ @PostMapping("/job/{infoJobId}")
+ public void startinfojob(@RequestBody String requestBody, @PathVariable String infoJobId) {
+ ConsumerJobInfo request = gson.fromJson(requestBody, ConsumerJobInfo.class);
+ log.info("Add Job Info" + infoJobId, request);
+ try {
+ this.jobs.addJob(request.infoTypeId, types.getType(request.infoTypeId), request.owner,
+ toJobParameters(request.jobDefinition));
+ } catch (Exception e) {
+ log.error("Error adding the job" + infoJobId, e.getMessage());
+ }
+ }
+
+ @PostMapping("/info-type-status")
+ public void statusChange(@RequestBody String requestBody) {
+ ConsumerStatusInfo request = gson.fromJson(requestBody, ConsumerStatusInfo.class);
+ log.info("Add Status Job Info", request);
+ }
+
+ private Parameters toJobParameters(Object jobData) {
+ String json = gson.toJson(jobData);
+ return gson.fromJson(json, Parameters.class);
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java
new file mode 100644
index 0000000..f668e3a
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/controllers/ThreadsController.java
@@ -0,0 +1,93 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.controllers;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+import com.demo.consumer.messages.ApplicationMessageHandlerImpl;
+
+@RestController
+public class ThreadsController {
+ private static final Logger log = LoggerFactory.getLogger(ThreadsController.class);
+
+ @Autowired
+ private SimpleConsumer simpleConsumer;
+
+ private Thread consumerThread;
+
+ @Async
+ @GetMapping("/startConsumer/{topicName}")
+ public CompletableFuture<String> startConsumer(@PathVariable("topicName") String topicName) {
+ try {
+ Thread consumerThread = new Thread(() -> {
+ try {
+ simpleConsumer.runAlways(topicName, new ApplicationMessageHandlerImpl());
+ } catch (Exception e) {
+ log.error("Error starting consuming on: " + topicName, e.getMessage());
+ }
+ });
+ consumerThread.start();
+ return CompletableFuture.completedFuture("Consumer started for topic: " + topicName);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ @GetMapping("/stopConsumer")
+ public String stopConsumer() {
+ if (consumerThread != null && consumerThread.isAlive()) {
+ try {
+ simpleConsumer.shutdown();
+ } catch (Exception e) {
+ log.error("Error stopping consumer Thread", e.getMessage());
+ }
+ return "Consumer stopped successfully";
+ } else {
+ return "No active consumer to stop";
+ }
+ }
+
+ @GetMapping("/listen/{numberOfMessages}/on/{topicName}")
+ public CompletableFuture<String> listenMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) {
+ try {
+ Thread consumerThread = new Thread(() -> {
+ try {
+ simpleConsumer.run(topicName, numberOfMessages, new ApplicationMessageHandlerImpl());
+ } catch (Exception e) {
+ log.error("Error starting consuming on: " + topicName, e.getMessage());
+ }
+ });
+ consumerThread.start();
+ return CompletableFuture.completedFuture("Consumer started for topic: " + topicName);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java
new file mode 100644
index 0000000..b3cd416
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerJobInfo.java
@@ -0,0 +1,52 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConsumerJobInfo {
+
+ @SerializedName("info_type_id")
+ @JsonProperty(value = "info_type_id", required = true)
+ public String infoTypeId = "";
+
+ @SerializedName("job_owner")
+ @JsonProperty(value = "job_owner", required = true)
+ public String owner = "";
+
+ @SerializedName("job_definition")
+ @JsonProperty(value = "job_definition", required = true)
+ public Object jobDefinition;
+
+ @SerializedName("job_result_uri")
+ @JsonProperty(value = "job_result_uri", required = true)
+ public String jobResultUri = "";
+
+ @SerializedName("status_notification_uri")
+ @JsonProperty(value = "status_notification_uri", required = false)
+ public String statusNotificationUri = "";
+
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java
new file mode 100644
index 0000000..46f1a3f
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/dme/ConsumerStatusInfo.java
@@ -0,0 +1,47 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.dme;
+
+import java.util.Collection;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ConsumerStatusInfo {
+
+ public enum InfoJobStatusValues {
+ REGISTERED, UNREGISTERED
+ }
+
+ @SerializedName("info_job_status")
+ @JsonProperty(value = "info_job_status", required = true)
+ public InfoJobStatusValues state;
+
+ @SerializedName("producers")
+ @JsonProperty(value = "producers", required = true)
+ public Collection<String> producers;
+
+}
+
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java
new file mode 100644
index 0000000..4f817a9
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/AbstractSimpleKafka.java
@@ -0,0 +1,42 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractSimpleKafka {
+ private static final Logger log = LoggerFactory.getLogger(AbstractSimpleKafka.class);
+
+ public AbstractSimpleKafka(){
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ });
+ log.info(MessageHelper.getSimpleJSONObject("Created the Shutdown Hook").toJSONString());
+ }
+
+ public abstract void shutdown();
+ public abstract void runAlways(String topicName, KafkaMessageHandler callback) throws Exception;
+ public abstract void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java
new file mode 100644
index 0000000..ed4a930
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/ApplicationMessageHandlerImpl.java
@@ -0,0 +1,39 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ApplicationMessageHandlerImpl implements KafkaMessageHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(ApplicationMessageHandlerImpl.class);
+
+ @Override
+ public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+ String source = KafkaMessageHandlerImpl.class.getName();
+ JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+ System.out.println(obj.toJSONString());
+ log.info(obj.toJSONString());
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java
new file mode 100644
index 0000000..9c16f9e
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandler.java
@@ -0,0 +1,28 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+@FunctionalInterface
+public interface KafkaMessageHandler {
+ void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java
new file mode 100644
index 0000000..8ff36fe
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/KafkaMessageHandlerImpl.java
@@ -0,0 +1,37 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.json.simple.JSONObject;
+
+public class KafkaMessageHandlerImpl implements KafkaMessageHandler {
+ private static final Logger log = LoggerFactory.getLogger(KafkaMessageHandlerImpl.class);
+
+ @Override
+ public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+ String source = KafkaMessageHandlerImpl.class.getName();
+ JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+ log.info(obj.toJSONString());
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java
new file mode 100644
index 0000000..21063cc
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/MessageHelper.java
@@ -0,0 +1,73 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import java.util.Properties;
+import java.util.Random;
+import org.json.simple.JSONObject;
+
+/**
+ * The type Message helper.
+ */
+public class MessageHelper {
+
+ private static Properties props;
+
+ public static String getRandomString() {
+ int leftLimit = 48; // numeral '0'
+ int rightLimit = 122; // letter 'z'
+ int targetStringLength = 10;
+ Random random = new Random();
+
+ return random.ints(leftLimit, rightLimit + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
+ .limit(targetStringLength)
+ .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
+ }
+
+ @SuppressWarnings("unchecked") // Using only strings
+ public static JSONObject getMessageLogEntryJSON(String source, String topic, String key, String message)
+ throws Exception {
+ JSONObject obj = new JSONObject();
+ String bootstrapServers = getProperties().getProperty("bootstrap.servers");
+ obj.put("bootstrapServers", bootstrapServers);
+ obj.put("source", source);
+ obj.put("topic", topic);
+ obj.put("key", key);
+ obj.put("message", message);
+
+ return obj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static JSONObject getSimpleJSONObject(String message) {
+ JSONObject obj = new JSONObject();
+ obj.put("message", message);
+ return obj;
+ }
+
+ protected static Properties getProperties() throws Exception {
+ if (props == null) {
+ props = PropertiesHelper.getProperties();
+ }
+ return props;
+ }
+
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java
new file mode 100644
index 0000000..18be2f8
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/messages/PropertiesHelper.java
@@ -0,0 +1,58 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.messages;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+
+@Component
+public class PropertiesHelper {
+ private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+
+ public static Properties getProperties() throws Exception {
+ Properties props = null;
+ try (InputStream input = SimpleConsumer.class.getClassLoader().getResourceAsStream("config.properties")) {
+ props = new Properties();
+ if (input == null) {
+ log.error("Found no configuration file in resources");
+ throw new Exception("Sorry, unable to find config.properties");
+ }
+ props.load(input);
+ String kafkaServers = System.getenv("KAFKA_SERVERS");
+ if (kafkaServers != null) {
+ props.setProperty("bootstrap.servers", kafkaServers);
+ log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+ } else {
+ log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+ }
+ } catch (IOException e) {
+ log.error("Error reading configuration file: ", e.getMessage());
+ }
+ return props;
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java
new file mode 100644
index 0000000..45b4c7c
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoType.java
@@ -0,0 +1,49 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@ToString
+@Builder(access = AccessLevel.PUBLIC)
+public class InfoType {
+
+ @Getter
+ @Setter
+ private String id;
+
+ @Getter
+ @Setter
+ private String kafkaInputTopic;
+
+ @Getter
+ @Setter
+ private String inputJobType;
+
+ @Getter
+ @Setter
+ private Object inputJobDefinition;
+
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java
new file mode 100644
index 0000000..f1e83be
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/InfoTypes.java
@@ -0,0 +1,81 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+
+@Component
+public class InfoTypes {
+ private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class);
+
+ private Map<String, InfoType> allTypes = new HashMap<>();
+
+ public InfoTypes(Collection<InfoType> types) {
+ for (InfoType type : types) {
+ put(type);
+ }
+ }
+
+ public synchronized InfoType get(String id) {
+ return allTypes.get(id);
+ }
+
+ public synchronized InfoType getType(String id) throws Exception {
+ InfoType type = allTypes.get(id);
+ if (type == null) {
+ throw new Exception("Could not find type: " + id + HttpStatus.NOT_FOUND.toString());
+ }
+ return type;
+ }
+
+ public static class ConfigFile {
+ Collection<InfoType> types;
+ }
+
+ public synchronized void put(InfoType type) {
+ logger.debug("Put type: {}", type.getId());
+ allTypes.put(type.getId(), type);
+ }
+
+ public synchronized Iterable<InfoType> getAll() {
+ return new Vector<>(allTypes.values());
+ }
+
+ public synchronized Collection<String> typeIds() {
+ return allTypes.keySet();
+ }
+
+ public synchronized int size() {
+ return allTypes.size();
+ }
+
+ public synchronized void clear() {
+ allTypes.clear();
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java
new file mode 100644
index 0000000..ba12345
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Job.java
@@ -0,0 +1,71 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+@ToString
+public class Job {
+ @Builder
+ public static class Parameters {
+
+ @Builder
+ @EqualsAndHashCode
+ public static class KafkaDeliveryInfo {
+ @Getter
+ private String topic;
+
+ @Getter
+ private String bootStrapServers;
+
+ @JsonProperty(value = "numberOfMessages")
+ @Getter
+ private int numberOfMessages;
+ }
+
+ @Getter
+ private KafkaDeliveryInfo deliveryInfo;
+ }
+
+ @Getter
+ private final String id;
+
+ @Getter
+ private final InfoType type;
+
+ @Getter
+ private final String owner;
+
+ @Getter
+ private final Parameters parameters;
+
+ public Job(String id, InfoType type, String owner, Parameters parameters) {
+ this.id = id;
+ this.type = type;
+ this.owner = owner;
+ this.parameters = parameters;
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java
new file mode 100644
index 0000000..13f3180
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/java/com/demo/consumer/repository/Jobs.java
@@ -0,0 +1,83 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer.repository;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.consumer.repository.Job.Parameters;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@Component
+public class Jobs {
+ private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
+
+ private Map<String, Job> allJobs = new HashMap<>();
+
+ public Jobs() {
+ }
+
+ public synchronized Job getJob(String id) throws Exception {
+ Job job = allJobs.get(id);
+ if (job == null) {
+ throw new Exception("Could not find job: " + id);
+ }
+ return job;
+ }
+
+ public synchronized Job get(String id) {
+ return allJobs.get(id);
+ }
+
+ public void addJob(String id, InfoType type, String owner, Parameters parameters) {
+ Job job = new Job(id, type, owner, parameters);
+ this.put(job);
+ }
+
+ private synchronized void put(Job job) {
+ logger.debug("Put job: {}", job.getId());
+ allJobs.put(job.getId(), job);
+ }
+
+ public synchronized Iterable<Job> getAll() {
+ return new Vector<>(allJobs.values());
+ }
+
+ public synchronized int size() {
+ return allJobs.size();
+ }
+
+ public synchronized Job delete(String id) {
+ return allJobs.remove(id);
+ }
+
+ @Override
+ public String toString() {
+ Gson gson = new GsonBuilder().create();
+ return gson.toJson(allJobs);
+ }
+}
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml
new file mode 100644
index 0000000..eed8326
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/resources/application.yaml
@@ -0,0 +1,31 @@
+# ========================LICENSE_START=================================
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+server:
+ port : 8081
+
+vars:
+ time: 1000
+ autostart: true
+ topic: mytopic #This topic is used only in autostart
+
+spring:
+ application:
+ name: demoConsumer
+
+logging:
+ level:
+ root: INFO
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties b/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties
new file mode 100644
index 0000000..48763d6
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/resources/config.properties
@@ -0,0 +1,42 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+# The location of the Kafka server
+bootstrap.servers=${KAFKA_SERVERS:localhost:9092}
+
+# the default group ID
+group.id=test-group
+
+# the default topic to use if one is not provided
+default.topic=magic-topic
+
+# The number of records to pull of the stream every time
+# the client takes a trip out to Kafka
+max.poll.records=10
+
+# Make Kafka keep track of record reads by the consumer
+enable.auto.commit=true
+
+# The time in milliseconds to Kafka write the offset of the last message read
+auto.commit.interval.ms=500
+
+# classes for serializing and deserializing messages
+key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+enable.idempotence=false
diff --git a/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml b/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml
new file mode 100644
index 0000000..dd90b77
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/main/resources/logback.xml
@@ -0,0 +1,45 @@
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+
+<configuration>
+ <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>logging.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>logging.%i.log.zip</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>10</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>10MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+ </encoder>
+ </appender>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+ </encoder>
+ </appender>
+ <root level="INFO">
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
diff --git a/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java b/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java
new file mode 100644
index 0000000..7a7c279
--- /dev/null
+++ b/sample-services/ics-producer-consumer/consumer/src/test/java/com/demo/consumer/SimpleConsumerTest.java
@@ -0,0 +1,86 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.consumer;
+
+import com.demo.consumer.consumer.SimpleConsumer;
+import com.demo.consumer.messages.KafkaMessageHandler;
+import com.demo.consumer.messages.PropertiesHelper;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.time.Duration;
+
+import java.util.Properties;
+
+import static org.mockito.Mockito.*;
+
+class SimpleConsumerTest {
+
+ private static final long TIME_OUT_MS = 1000;
+
+ @Mock
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ @InjectMocks
+ private SimpleConsumer simpleConsumer;
+
+ private AutoCloseable closable;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ closable = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ closable.close();
+ }
+
+ @Test
+ void testRun() throws Exception {
+ // Mocking the properties object returned by PropertiesHelper.getProperties()
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", "localhost:9092");
+ properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ // Mocking PropertiesHelper.getProperties() to return the mocked Properties object
+ try (MockedStatic<PropertiesHelper> propertiesHelperMockedStatic = Mockito.mockStatic(PropertiesHelper.class)) {
+ propertiesHelperMockedStatic.when(PropertiesHelper::getProperties).thenReturn(properties);
+
+ String topicName = "testTopic";
+ int numberOfMessages = 10;
+ KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
+
+ simpleConsumer.run(topicName, numberOfMessages, callback);
+ verify(kafkaConsumer, times(0)).poll(Duration.ofMillis(TIME_OUT_MS));
+ }
+ }
+
+}
diff --git a/sample-services/ics-producer-consumer/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose.yaml
new file mode 100644
index 0000000..d9239bc
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose.yaml
@@ -0,0 +1,83 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+version: '2'
+
+networks:
+ my-network:
+ name: kafka
+ driver: bridge
+
+services:
+ kafka-zkless:
+ container_name: kafka-zkless
+ image: quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64
+ command:
+ [
+ "sh",
+ "-c",
+ "export CLUSTER_ID=$$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $$CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override listener.security.protocol.map=$${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} --override listeners=$${KAFKA_LISTENERS}",
+ ]
+ ports:
+ - "9092:9092"
+ environment:
+ LOG_DIR: "/tmp/logs"
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-zkless:29092,PLAINTEXT_HOST://kafka-zkless:9092
+ tty: true
+ stdin_open: true
+ networks:
+ - my-network
+
+ informationcoordinator:
+ image: nexus3.o-ran-sc.org:10001/o-ran-sc/nonrtric-plt-informationcoordinatorservice:1.6.0
+ container_name: informationcoordinatorservice
+ ports:
+ - "8083:8083"
+ volumes:
+ - ./application.yaml:/opt/app/information-coordinator-service/config/application.yaml
+ networks:
+ - my-network
+
+ kafka-producer:
+ image: o-ran-sc/nonrtric-sample-icsproducer:latest
+ container_name: kafka-producer
+ environment:
+ - KAFKA_SERVERS=kafka-zkless:9092
+ ports:
+ - "8080:8080"
+ networks:
+ - my-network
+
+ kafka-consumer:
+ image: o-ran-sc/nonrtric-sample-icsconsumer:latest
+ container_name: kafka-consumer
+ environment:
+ - KAFKA_SERVERS=kafka-zkless:9092
+ ports:
+ - "8081:8081"
+ networks:
+ - my-network
+
+ curl-client:
+ image: curlimages/curl:latest
+ container_name: curl-client
+ command: ["tail", "-f", "/dev/null"]
+ networks:
+ - my-network
diff --git a/sample-services/ics-producer-consumer/docker-compose/.env b/sample-services/ics-producer-consumer/docker-compose/.env
new file mode 100644
index 0000000..257ac73
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/.env
@@ -0,0 +1,24 @@
+# ============LICENSE_START===============================================
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+#CONTROL_PANEL
+CONTROL_PANEL_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-controlpanel"
+CONTROL_PANEL_IMAGE_TAG="2.5.0"
+
+#NONRTRIC_GATEWAY
+NONRTRIC_GATEWAY_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-gateway"
+NONRTRIC_GATEWAY_IMAGE_TAG="1.2.0"
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt b/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt
new file mode 100644
index 0000000..2496663
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/LICENSE.txt
@@ -0,0 +1,14 @@
+LICENSE.txt
+
+Unless otherwise specified, all software contained herein is licensed
+under the Apache License, Version 2.0 (the "Software License");
+you may not use this software except in compliance with the Software
+License. You may obtain a copy of the Software License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the Software License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the Software License for the specific language governing permissions
+and limitations under the Software License.
diff --git a/sample-services/ics-producer-consumer/docker-compose/README.md b/sample-services/ics-producer-consumer/docker-compose/README.md
new file mode 100644
index 0000000..d7542a8
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/README.md
@@ -0,0 +1,35 @@
+# License
+
+Copyright (C) 2020 Nordix Foundation.
+Licensed under the Apache License, Version 2.0 (the "License")
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+For more information about license please see the [LICENSE](LICENSE.txt) file for details.
+
+## O-RAN-SC docker-compose files
+
+The docker compose file helps the user to deploy the components of nonrtric control panel with one command.
+
+NOTE:
+docker image urls & tags are in file ```.env```
+
+To install the Control Panel and gateway, run the following command:
+
+```shell
+docker-compose --env-file .env -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml up -d
+```
+
+To remove the containers, use the command:
+
+```shell
+docker-compose --env-file .env -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml down
+```
diff --git a/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf b/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf
new file mode 100644
index 0000000..66e8ac4
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/control-panel/config/nginx.conf
@@ -0,0 +1,45 @@
+# ============LICENSE_START===============================================
+# Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+
+events{}
+
+http {
+ include /etc/nginx/mime.types;
+ resolver 127.0.0.11;
+ server {
+ listen 8080;
+ server_name localhost;
+ root /usr/share/nginx/html;
+ index index.html;
+ location /a1-policy/ {
+ set $upstream nonrtric-gateway;
+ proxy_pass http://$upstream:9090;
+ }
+ location /data-producer/{
+ set $upstream nonrtric-gateway;
+ proxy_pass http://$upstream:9090;
+ }
+ location /data-consumer/{
+ set $upstream nonrtric-gateway;
+ proxy_pass http://$upstream:9090;
+ }
+ location / {
+ try_files $uri $uri/ /index.html;
+ }
+ }
+}
diff --git a/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml
new file mode 100644
index 0000000..a42b7a8
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/control-panel/docker-compose.yaml
@@ -0,0 +1,32 @@
+# Copyright (C) 2020 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+version: '3.5'
+
+networks:
+ kafka:
+ external: true
+
+services:
+ policy-control-panel:
+ image: "${CONTROL_PANEL_IMAGE_BASE}:${CONTROL_PANEL_IMAGE_TAG}"
+ container_name: policy-control-panel
+ networks:
+ - kafka
+ ports:
+ - 8181:8080
+ - 8282:8082
+ volumes:
+ - ./control-panel/config/nginx.conf:/etc/nginx/nginx.conf:ro
diff --git a/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml
new file mode 100644
index 0000000..c30aa2c
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/docker-compose.yaml
@@ -0,0 +1,20 @@
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+version: '3.5'
+
+networks:
+ kafka:
+ external: true
diff --git a/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml
new file mode 100644
index 0000000..c620b03
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/config/application-nonrtricgateway.yaml
@@ -0,0 +1,48 @@
+################################################################################
+# Copyright (c) 2021 Nordix Foundation. #
+# #
+# Licensed under the Apache License, Version 2.0 (the "License"); #
+# you may not use this file except in compliance with the License. #
+# You may obtain a copy of the License at #
+# #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+# #
+# Unless required by applicable law or agreed to in writing, software #
+# distributed under the License is distributed on an "AS IS" BASIS, #
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
+# See the License for the specific language governing permissions and #
+# limitations under the License. #
+################################################################################
+
+server:
+ port: 9090
+spring:
+ cloud:
+ gateway:
+ httpclient:
+ ssl:
+ useInsecureTrustManager: true
+ wiretap: true
+ httpserver:
+ wiretap: true
+ routes:
+ - id: A1-EI
+ uri: http://informationcoordinatorservice:8083
+ predicates:
+ - Path=/data-producer/**,/data-consumer/**
+management:
+ endpoint:
+ gateway:
+ enabled: true
+ endpoints:
+ web:
+ exposure:
+ include: "gateway,loggers,logfile,health,info,metrics,threaddump,heapdump"
+logging:
+ level:
+ ROOT: ERROR
+ org.springframework: ERROR
+ org.springframework.cloud.gateway: INFO
+ reactor.netty: INFO
+ file:
+ name: /var/log/nonrtric-gateway/application.log
diff --git a/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml
new file mode 100644
index 0000000..138229d
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-compose/nonrtric-gateway/docker-compose.yaml
@@ -0,0 +1,33 @@
+# Copyright (C) 2021 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+version: '3.5'
+
+networks:
+ kafka:
+ external: true
+
+services:
+ nonrtric-gateway:
+ image: "${NONRTRIC_GATEWAY_IMAGE_BASE}:${NONRTRIC_GATEWAY_IMAGE_TAG}"
+ container_name: nonrtric-gateway
+ networks:
+ kafka:
+ aliases:
+ - nonrtric-gateway-container
+ ports:
+ - 9090:9090
+ volumes:
+ - ./nonrtric-gateway/config/application-nonrtricgateway.yaml:/opt/app/nonrtric-gateway/config/application.yaml:ro
diff --git a/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml b/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml
new file mode 100644
index 0000000..0184de4
--- /dev/null
+++ b/sample-services/ics-producer-consumer/docker-composeRedPanda.yaml
@@ -0,0 +1,38 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+version: '2'
+networks:
+ kafka:
+ external: true
+
+services:
+ redpanda-console:
+ container_name: redpanda-console
+ image: docker.redpanda.com/redpandadata/console:v2.4.5
+ entrypoint: /bin/sh
+ command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
+ environment:
+ CONFIG_FILEPATH: /tmp/config.yml
+ CONSOLE_CONFIG_FILE: |
+ kafka:
+ brokers: ["kafka-zkless:9092"]
+ ports:
+ - 8888:8080
+ networks:
+ - kafka
diff --git a/sample-services/ics-producer-consumer/producer/Dockerfile b/sample-services/ics-producer-consumer/producer/Dockerfile
new file mode 100644
index 0000000..148872a
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/Dockerfile
@@ -0,0 +1,49 @@
+#==================================================================================
+# Copyright (C) 2024: OpenInfra Foundation Europe
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#==================================================================================
+
+# Use Maven image with OpenJDK 17 for the build stage
+FROM maven:3.8.5-openjdk-17 AS maven_build
+
+# Copy Maven project files
+COPY pom.xml /tmp/
+COPY src /tmp/src/
+
+# Set working directory
+WORKDIR /tmp/
+
+# Build the Maven project
+RUN mvn package
+
+# Use a separate image with OpenJDK 17 for the runtime stage
+FROM openjdk:17-jdk-slim
+
+# Expose port 8080
+EXPOSE 8080
+
+ARG KAFKA_SERVERS
+ENV KAFKA_SERVERS=${KAFKA_SERVERS}
+
+# Set the working directory
+WORKDIR /app
+
+# Copy the JAR file from the maven_build stage to the runtime stage
+COPY --from=maven_build /tmp/target/producer-0.0.1.jar /app/producer-0.0.1.jar
+
+# Command to run the application
+CMD ["java", "-jar", "producer-0.0.1.jar"]
diff --git a/sample-services/ics-producer-consumer/producer/Makefile b/sample-services/ics-producer-consumer/producer/Makefile
new file mode 100644
index 0000000..301526a
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/Makefile
@@ -0,0 +1,51 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+# Define variables
+IMAGE_NAME = "o-ran-sc/nonrtric-sample-icsproducer"
+DOCKERFILE = Dockerfile
+
+# Default target
+.PHONY: all
+all: build run
+
+# Target to build the Maven JAR
+.PHONY: jar
+jar:
+ mvn clean package
+
+# Target to build the Docker image
+.PHONY: build
+build:
+ docker build -t $(IMAGE_NAME) -f $(DOCKERFILE) .
+
+# Target to run the Docker container
+.PHONY: run
+run:
+ docker run -p 8080:8080 $(IMAGE_NAME)
+
+# Target to stop and remove the Docker container
+.PHONY: stop
+stop:
+ docker stop $(IMAGE_NAME) || true
+ docker rm $(IMAGE_NAME) || true
+
+# Target to clean up
+.PHONY: clean
+clean: stop
+ docker rmi $(IMAGE_NAME) || true
diff --git a/sample-services/ics-producer-consumer/producer/container-tag.yaml b/sample-services/ics-producer-consumer/producer/container-tag.yaml
new file mode 100644
index 0000000..48290af
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/container-tag.yaml
@@ -0,0 +1,23 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+# The Jenkins job requires a tag to build the Docker image.
+# By default this file is in the docker build directory,
+# but the location can configured in the JJB template.
+---
+tag: 0.0.1
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/producer/pom.xml b/sample-services/ics-producer-consumer/producer/pom.xml
new file mode 100644
index 0000000..d78f370
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.2.3</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>com.demo.producer</groupId>
+ <artifactId>producer</artifactId>
+ <version>0.0.1</version>
+ <packaging>jar</packaging>
+
+ <properties>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ </properties>
+
+ <dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.7.0</version>
+ <!--<scope>provided</scope> -->
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.24</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.10.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <configuration>
+ <mainClass>com.demo.producer.Application</mainClass>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java
new file mode 100644
index 0000000..fc389f4
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/Application.java
@@ -0,0 +1,56 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+import com.demo.producer.messages.KafkaMessageHandlerImpl;
+import com.demo.producer.producer.SimpleProducer;
+
+@SpringBootApplication
+@EnableAsync
+public class Application {
+
+ @Autowired
+ private SimpleProducer simpleProducer;
+
+ @Value("${vars.autostart:false}")
+ private boolean autostart;
+
+ @Value("${vars.topic}")
+ private String topic;
+ public static void main(String[] args) throws Exception {
+ SpringApplication.run(Application.class, args);
+ }
+
+ @EventListener(ApplicationReadyEvent.class)
+ public void checkAutoRun() throws Exception{
+ if (autostart) {
+ simpleProducer.runAlways(topic, new KafkaMessageHandlerImpl());
+ }
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java
new file mode 100644
index 0000000..5bc6821
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ProducerController.java
@@ -0,0 +1,137 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.controllers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.demo.producer.repository.InfoType;
+import com.demo.producer.repository.InfoTypes;
+import com.demo.producer.repository.Job.Parameters;
+import com.demo.producer.repository.Jobs;
+import com.demo.producer.dme.ProducerJobInfo;
+import com.demo.producer.messages.KafkaMessageHandlerImpl;
+import com.demo.producer.producer.SimpleProducer;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@RestController
+@RequestMapping(path = "/producer", produces = "application/json")
+public class ProducerController {
+ private static final Logger log = LoggerFactory.getLogger(ProducerController.class);
+
+ private static Gson gson = new GsonBuilder().create();
+
+ private final Jobs jobs;
+ private final InfoTypes types;
+ private String topicName = "mytopic";
+
+
+ public ProducerController(@Autowired Jobs jobs, @Autowired InfoTypes types) {
+ this.jobs = jobs;
+ this.types = types;
+ InfoType type1 = InfoType.builder().build();
+ type1.setId("type1");
+ type1.setKafkaInputTopic(topicName);
+ type1.setInputJobType("type1");
+ type1.setInputJobDefinition(null);
+ types.put(type1);
+ }
+
+ @GetMapping("/publish/{numberOfMessages}")
+ public ResponseEntity<?> publishMessage(@PathVariable int numberOfMessages) {
+ try {
+ new SimpleProducer().run(topicName, numberOfMessages, new KafkaMessageHandlerImpl());
+ return ResponseEntity.ok("message published successfully ..");
+ } catch (Exception ex) {
+ return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
+ .build();
+ }
+ }
+
+ @PostMapping("/job/{infoJobId}")
+ public void jobCallback(@RequestBody String requestBody, @PathVariable String infoJobId) {
+ ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class);
+ try {
+ log.info("Adding producer job info " + request.toString());
+ this.jobs.addJob(request.id, types.getType(request.typeId), request.owner,
+ toJobParameters(request.jobData));
+ } catch (Exception e) {
+ log.error("Error adding producer job info: " + request.toString(), e.getMessage());
+ }
+ }
+
+ @PostMapping("/job")
+ public void jobCallbackNoId(@RequestBody String requestBody) {
+ ProducerJobInfo request = gson.fromJson(requestBody, ProducerJobInfo.class);
+ try {
+ log.info("Adding producer job info "+request.toString());
+ this.jobs.addJob(request.id, types.getType(request.typeId), request.owner,
+ toJobParameters(request.jobData));
+ } catch (Exception e) {
+ log.error("Error adding producer job info: " + request.toString(), e.getMessage());
+ }
+ }
+
+ private Parameters toJobParameters(Object jobData) {
+ String json = gson.toJson(jobData);
+ return gson.fromJson(json, Parameters.class);
+ }
+
+ @GetMapping("/job")
+ public ResponseEntity<String> getJobs() {
+ try {
+ log.info("Get all jobs");
+ return new ResponseEntity<>(this.jobs.getAll().toString(), HttpStatus.OK);
+ } catch (Exception e) {
+ log.error("Error finding jobs", e.getMessage());
+ return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @DeleteMapping("/job/{infoJobId}")
+ public ResponseEntity<String> deleteJob(@PathVariable String infoJobId) {
+ try {
+ log.info("Delete Job" + infoJobId);
+ this.jobs.delete(infoJobId);
+ return new ResponseEntity<>("Deleted job:" + infoJobId, HttpStatus.OK);
+ } catch (Exception e) {
+ log.error("Error finding job " + infoJobId, e.getMessage());
+ return new ResponseEntity<>(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ @GetMapping("/supervision")
+ public ResponseEntity<String> getSupervision() {
+ log.info("Get Supervision");
+ return new ResponseEntity<>("Ok", HttpStatus.OK);
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java
new file mode 100644
index 0000000..e503b84
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/controllers/ThreadsController.java
@@ -0,0 +1,93 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.controllers;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+import com.demo.producer.messages.KafkaMessageHandlerImpl;
+import com.demo.producer.messages.ApplicationMessageHandlerImpl;
+import com.demo.producer.producer.SimpleProducer;
+
+@RestController
+public class ThreadsController {
+ private static final Logger log = LoggerFactory.getLogger(ThreadsController.class);
+
+ @Autowired
+ private SimpleProducer simpleProducer;
+
+ private Thread producerThread;
+
+ @Async
+ @GetMapping("/startProducer/{topicName}")
+ public CompletableFuture<String> startProducer(@PathVariable("topicName") String topicName) {
+ try {
+ producerThread = new Thread(() -> {
+ try {
+ simpleProducer.runAlways(topicName, new ApplicationMessageHandlerImpl());
+ } catch (Exception e) {
+ log.error("Error starting producer on: " + topicName, e.getMessage());
+ }
+ });
+ producerThread.start();
+ return CompletableFuture.completedFuture("Producer started for topic: " + topicName);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+
+ @GetMapping("/stopProducer")
+ public String stopProducer() {
+ if (producerThread != null && producerThread.isAlive()) {
+ try {
+ simpleProducer.shutdown();
+ } catch (Exception e) {
+ log.error("Error stopping producer Thread", e.getMessage());
+ }
+ return "Producer stopped successfully";
+ } else {
+ return "No active producer to stop";
+ }
+ }
+
+ @GetMapping("/publish/{numberOfMessages}/on/{topicName}")
+ public CompletableFuture<String> publishNMessage(@PathVariable int numberOfMessages, @PathVariable String topicName) {
+ try {
+ producerThread = new Thread(() -> {
+ try {
+ simpleProducer.run(topicName, numberOfMessages, new KafkaMessageHandlerImpl());
+ } catch (Exception e) {
+ log.error("Error producing " + numberOfMessages + "on " + topicName, e.getMessage());
+ }
+ });
+ producerThread.start();
+ return CompletableFuture.completedFuture("Producer started for topic: " + topicName);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java
new file mode 100644
index 0000000..5f87f4d
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerInfoTypeInfo.java
@@ -0,0 +1,41 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProducerInfoTypeInfo {
+
+ @SerializedName("info_job_data_schema")
+ @JsonProperty(value = "info_job_data_schema", required = true)
+ public Object jobDataSchema;
+
+ @SerializedName("info_type_information")
+ @JsonProperty(value = "info_type_information", required = true)
+ public Object typeSpecificInformation;
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java
new file mode 100644
index 0000000..c55d4c5
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerJobInfo.java
@@ -0,0 +1,53 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProducerJobInfo {
+
+ @SerializedName("info_job_identity")
+ @JsonProperty("info_job_identity")
+ public String id = "";
+
+ @SerializedName("info_type_identity")
+ @JsonProperty("info_type_identity")
+ public String typeId = "";
+
+ @SerializedName("info_job_data")
+ @JsonProperty("info_job_data")
+ public Object jobData;
+
+ @SerializedName("owner")
+ @JsonProperty("owner")
+ public String owner = "";
+
+ @SerializedName("last_updated")
+ @JsonProperty("last_updated")
+ public String lastUpdated = "";
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java
new file mode 100644
index 0000000..5f279d2
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/dme/ProducerRegistrationInfo.java
@@ -0,0 +1,47 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.dme;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Collection;
+
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+@AllArgsConstructor
+public class ProducerRegistrationInfo {
+
+ @SerializedName("supported_info_types")
+ @JsonProperty(value = "supported_info_types", required = true)
+ public Collection<String> supportedTypeIds;
+
+ @SerializedName("info_job_callback_url")
+ @JsonProperty(value = "info_job_callback_url", required = true)
+ public String jobCallbackUrl;
+
+ @SerializedName("info_producer_supervision_callback_url")
+ @JsonProperty(value = "info_producer_supervision_callback_url", required = true)
+ public String producerSupervisionCallbackUrl;
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java
new file mode 100644
index 0000000..d00d074
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/AbstractSimpleKafka.java
@@ -0,0 +1,43 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public abstract class AbstractSimpleKafka {
+ private static final Logger log = LoggerFactory.getLogger(AbstractSimpleKafka.class);
+
+ public AbstractSimpleKafka(){
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ });
+ log.info(MessageHelper.getSimpleJSONObject("Created the Shutdown Hook").toJSONString());
+ }
+
+ public abstract void shutdown();
+ public abstract void runAlways(String topicName, KafkaMessageHandler callback) throws Exception;
+ public abstract void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java
new file mode 100644
index 0000000..beae794
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/ApplicationMessageHandlerImpl.java
@@ -0,0 +1,39 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ApplicationMessageHandlerImpl implements KafkaMessageHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(ApplicationMessageHandlerImpl.class);
+
+ @Override
+ public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+ String source = KafkaMessageHandlerImpl.class.getName();
+ JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+ System.out.println(obj.toJSONString());
+ log.info(obj.toJSONString());
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java
new file mode 100644
index 0000000..814c88c
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandler.java
@@ -0,0 +1,28 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+@FunctionalInterface
+public interface KafkaMessageHandler {
+ void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception;
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java
new file mode 100644
index 0000000..06f7026
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/KafkaMessageHandlerImpl.java
@@ -0,0 +1,37 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.json.simple.JSONObject;
+
+public class KafkaMessageHandlerImpl implements KafkaMessageHandler {
+ private static final Logger log = LoggerFactory.getLogger(KafkaMessageHandlerImpl.class);
+
+ @Override
+ public void processMessage(String topicName, ConsumerRecord<String, String> message) throws Exception {
+ String source = KafkaMessageHandlerImpl.class.getName();
+ JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, message.key(), message.value());
+ log.info(obj.toJSONString());
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java
new file mode 100644
index 0000000..7a5e45b
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/MessageHelper.java
@@ -0,0 +1,71 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import java.util.Properties;
+import java.util.Random;
+import org.json.simple.JSONObject;
+
+public class MessageHelper {
+
+ private static Properties props;
+
+ public static String getRandomString() {
+ int leftLimit = 48; // numeral '0'
+ int rightLimit = 122; // letter 'z'
+ int targetStringLength = 10;
+ Random random = new Random();
+
+ return random.ints(leftLimit, rightLimit + 1).filter(i -> (i <= 57 || i >= 65) && (i <= 90 || i >= 97))
+ .limit(targetStringLength)
+ .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
+ }
+
+ @SuppressWarnings("unchecked") // Using only strings
+ public static JSONObject getMessageLogEntryJSON(String source, String topic, String key, String message)
+ throws Exception {
+ JSONObject obj = new JSONObject();
+ String bootstrapServers = getProperties().getProperty("bootstrap.servers");
+ obj.put("bootstrapServers", bootstrapServers);
+ obj.put("source", source);
+ obj.put("topic", topic);
+ obj.put("key", key);
+ obj.put("message", message);
+
+ return obj;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public static JSONObject getSimpleJSONObject(String message){
+ JSONObject obj = new JSONObject();
+ obj.put("message", message);
+ return obj;
+ }
+
+ protected static Properties getProperties() throws Exception {
+ if (props == null) {
+ props = PropertiesHelper.getProperties();
+ }
+ return props;
+ }
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java
new file mode 100644
index 0000000..7dc2b1e
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/messages/PropertiesHelper.java
@@ -0,0 +1,58 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.messages;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.producer.producer.SimpleProducer;
+
+@Component
+public class PropertiesHelper {
+ private static final Logger log = LoggerFactory.getLogger(PropertiesHelper.class);
+
+ public static Properties getProperties() throws Exception {
+ Properties props = null;
+ try (InputStream input = SimpleProducer.class.getClassLoader().getResourceAsStream("config.properties")) {
+ props = new Properties();
+ if (input == null) {
+ log.error("Found no configuration file in resources");
+ throw new Exception("Sorry, unable to find config.properties");
+ }
+ props.load(input);
+ String kafkaServers = System.getenv("KAFKA_SERVERS");
+ if (kafkaServers != null) {
+ props.setProperty("bootstrap.servers", kafkaServers);
+ log.info("Env variable KAFKA_SERVERS found, adding: " + kafkaServers);
+ } else {
+ log.info("Env variable KAFKA_SERVERS not found, defaulting to config file");
+ }
+ } catch (IOException e) {
+ log.error("Error reading configuration file: ", e.getMessage());
+ }
+ return props;
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java
new file mode 100644
index 0000000..6c9858b
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/producer/SimpleProducer.java
@@ -0,0 +1,112 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.producer.producer;
+
+import java.util.UUID;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.json.simple.JSONObject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import com.demo.producer.messages.AbstractSimpleKafka;
+import com.demo.producer.messages.KafkaMessageHandler;
+import com.demo.producer.messages.MessageHelper;
+import com.demo.producer.messages.PropertiesHelper;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Component
+@Getter
+@Setter
+public class SimpleProducer extends AbstractSimpleKafka {
+ private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);
+
+ @Value("${vars.time:1000}")
+ private int TIME;
+
+ private KafkaProducer<String, String> kafkaProducer;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public void run(String topicName, int numberOfMessages, KafkaMessageHandler callback) throws Exception {
+ for (int i = 0; i < numberOfMessages; i++) {
+ String key = UUID.randomUUID().toString();
+ String message = MessageHelper.getRandomString();
+ if (this.getTopicName() == null) {
+ this.setTopicName(topicName);
+ }
+ this.send(topicName, key, message);
+ Thread.sleep(TIME);
+ }
+ this.shutdown();
+ }
+
+ public void runAlways(String topicName, KafkaMessageHandler callback) throws Exception {
+ while (true) {
+ String key = UUID.randomUUID().toString();
+ String message = MessageHelper.getRandomString();
+ this.send(topicName, key, message);
+ Thread.sleep(TIME);
+ }
+ }
+
+ private String topicName = null;
+
+ private void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ private String getTopicName() {
+ return this.topicName;
+ }
+
+ protected void send(String topicName, String key, String message) throws Exception {
+ String source = SimpleProducer.class.getName();
+ ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, message);
+ JSONObject obj = MessageHelper.getMessageLogEntryJSON(source, topicName, key, message);
+ log.info(obj.toJSONString());
+ getKafkaProducer().send(producerRecord);
+ }
+
+ private KafkaProducer<String, String> getKafkaProducer() throws Exception {
+ if (this.kafkaProducer == null) {
+ Properties props = PropertiesHelper.getProperties();
+ this.kafkaProducer = new KafkaProducer<>(props);
+ }
+ return this.kafkaProducer;
+ }
+
+ public void shutdown(){
+ closed.set(true);
+ try {
+ log.info(MessageHelper.getSimpleJSONObject("Shutting down producer").toJSONString());
+ getKafkaProducer().close();
+ } catch (Exception e) {
+ log.error("Error shutting down the Producer ", e.getMessage());
+ }
+
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java
new file mode 100644
index 0000000..9f9c0cf
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoType.java
@@ -0,0 +1,49 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@ToString
+@Builder(access = AccessLevel.PUBLIC)
+public class InfoType {
+
+ @Getter
+ @Setter
+ private String id;
+
+ @Getter
+ @Setter
+ private String kafkaInputTopic;
+
+ @Getter
+ @Setter
+ private String inputJobType;
+
+ @Getter
+ @Setter
+ private Object inputJobDefinition;
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java
new file mode 100644
index 0000000..b7a9914
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/InfoTypes.java
@@ -0,0 +1,81 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+
+@Component
+public class InfoTypes {
+ private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class);
+
+ private Map<String, InfoType> allTypes = new HashMap<>();
+
+ public InfoTypes(Collection<InfoType> types) {
+ for (InfoType type : types) {
+ put(type);
+ }
+ }
+
+ public synchronized InfoType get(String id) {
+ return allTypes.get(id);
+ }
+
+ public synchronized InfoType getType(String id) throws Exception {
+ InfoType type = allTypes.get(id);
+ if (type == null) {
+ throw new Exception("Could not find type: " + id + HttpStatus.NOT_FOUND.toString());
+ }
+ return type;
+ }
+
+ public static class ConfigFile {
+ Collection<InfoType> types;
+ }
+
+ public synchronized void put(InfoType type) {
+ logger.debug("Put type: {}", type.getId());
+ allTypes.put(type.getId(), type);
+ }
+
+ public synchronized Iterable<InfoType> getAll() {
+ return new Vector<>(allTypes.values());
+ }
+
+ public synchronized Collection<String> typeIds() {
+ return allTypes.keySet();
+ }
+
+ public synchronized int size() {
+ return allTypes.size();
+ }
+
+ public synchronized void clear() {
+ allTypes.clear();
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java
new file mode 100644
index 0000000..cafddde
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Job.java
@@ -0,0 +1,71 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+@ToString
+public class Job {
+ @Builder
+ public static class Parameters {
+
+ @Builder
+ @EqualsAndHashCode
+ public static class KafkaDeliveryInfo {
+ @Getter
+ private String topic;
+
+ @Getter
+ private String bootStrapServers;
+
+ @JsonProperty(value = "numberOfMessages")
+ @Getter
+ private int numberOfMessages;
+ }
+
+ @Getter
+ private KafkaDeliveryInfo deliveryInfo;
+ }
+
+ @Getter
+ private final String id;
+
+ @Getter
+ private final InfoType type;
+
+ @Getter
+ private final String owner;
+
+ @Getter
+ private final Parameters parameters;
+
+ public Job(String id, InfoType type, String owner, Parameters parameters) {
+ this.id = id;
+ this.type = type;
+ this.owner = owner;
+ this.parameters = parameters;
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java
new file mode 100644
index 0000000..f9cffd8
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/java/com/demo/producer/repository/Jobs.java
@@ -0,0 +1,83 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer.repository;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.demo.producer.repository.Job.Parameters;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@Component
+public class Jobs {
+ private static final Logger logger = LoggerFactory.getLogger(Jobs.class);
+
+ private Map<String, Job> allJobs = new HashMap<>();
+
+ public Jobs() {
+ }
+
+ public synchronized Job getJob(String id) throws Exception {
+ Job job = allJobs.get(id);
+ if (job == null) {
+ throw new Exception("Could not find job: " + id);
+ }
+ return job;
+ }
+
+ public synchronized Job get(String id) {
+ return allJobs.get(id);
+ }
+
+ public void addJob(String id, InfoType type, String owner, Parameters parameters) {
+ Job job = new Job(id, type, owner, parameters);
+ this.put(job);
+ }
+
+ private synchronized void put(Job job) {
+ logger.debug("Put job: {}", job.getId());
+ allJobs.put(job.getId(), job);
+ }
+
+ public synchronized Iterable<Job> getAll() {
+ return new Vector<>(allJobs.values());
+ }
+
+ public synchronized int size() {
+ return allJobs.size();
+ }
+
+ public synchronized Job delete(String id) {
+ return allJobs.remove(id);
+ }
+
+ @Override
+ public String toString() {
+ Gson gson = new GsonBuilder().create();
+ return gson.toJson(allJobs);
+ }
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml b/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml
new file mode 100644
index 0000000..31966df
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/resources/application.yaml
@@ -0,0 +1,28 @@
+# ============LICENSE_START===============================================
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+vars:
+ time: 1000
+ autostart: true
+ topic: mytopic #This topic is used only in autostart
+
+spring:
+ application:
+ name: demoProducer
+
+logging:
+ level:
+ root: INFO
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties b/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties
new file mode 100644
index 0000000..83e5539
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/resources/config.properties
@@ -0,0 +1,44 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+# The location of the Kafka server
+bootstrap.servers=${KAFKA_SERVERS:localhost:9092}
+
+# the default group ID
+group.id=test-group
+
+# the default topic to use if one is not provided
+default.topic=magic-topic
+
+# The number of records to pull of the stream every time
+# the client takes a trip out to Kafka
+max.poll.records=10
+
+# Make Kafka keep track of record reads by the consumer
+enable.auto.commit=true
+
+# The time in milliseconds to Kafka write the offset of the last message read
+auto.commit.interval.ms=500
+
+# classes for serializing and deserializing messages
+key.serializer=org.apache.kafka.common.serialization.StringSerializer
+value.serializer=org.apache.kafka.common.serialization.StringSerializer
+key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+enable.idempotence=false
diff --git a/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml b/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml
new file mode 100644
index 0000000..3dbfe39
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/main/resources/logback.xml
@@ -0,0 +1,44 @@
+<!--
+* ========================LICENSE_START=================================
+* O-RAN-SC
+*
+* Copyright (C) 2024 OpenInfra Foundation Europe. All rights reserved.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ========================LICENSE_END===================================
+-->
+<configuration>
+ <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>logging.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>logging.%i.log.zip</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>10</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>10MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+ </encoder>
+ </appender>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
+ </encoder>
+ </appender>
+ <root level="INFO">
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>
diff --git a/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java
new file mode 100644
index 0000000..da77873
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/SimpleProducerTest.java
@@ -0,0 +1,105 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer;
+
+import com.demo.producer.producer.SimpleProducer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+import com.demo.producer.messages.KafkaMessageHandler;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+class SimpleProducerTest {
+
+ private final int wait = 1000;
+ private final String topicName = "testTopic";
+
+ @Mock
+ private KafkaProducer<String, String> kafkaProducer;
+
+ @InjectMocks
+ @Autowired
+ private SimpleProducer simpleProducer;
+
+ private AutoCloseable closable;
+
+ @BeforeEach
+ void setUp() {
+ closable = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void close() throws Exception {
+ closable.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") //sending only Strings
+ void testRun() throws Exception {
+ int numberOfMessages = 10;
+ KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
+
+ simpleProducer.run(topicName, numberOfMessages, callback);
+
+ verify(kafkaProducer, times(numberOfMessages)).send(any(ProducerRecord.class));
+ verify(kafkaProducer, times(1)).close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked") //sending only Strings
+ void testRunAlways() throws Exception {
+ KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
+ simpleProducer.setTIME(wait);
+ // Mocking behavior to break out of the loop after a few iterations
+ doAnswer(invocation -> {
+ simpleProducer.shutdown();
+ return null;
+ }).when(kafkaProducer).send(any(ProducerRecord.class));
+
+ // Invoking runAlways() in a separate thread to avoid an infinite loop
+ Thread thread = new Thread(() -> {
+ try {
+ simpleProducer.runAlways(topicName, callback);
+ } catch (Exception e) {
+ }
+ });
+ thread.start();
+
+ // Let the thread execute for some time (e.g., 1 second)
+ Thread.sleep(wait);
+
+ // Interrupting the thread to stop the infinite loop
+ thread.interrupt();
+
+ verify(kafkaProducer, atLeastOnce()).send(any(ProducerRecord.class));
+ verify(kafkaProducer, times(1)).close();
+ }
+
+}
diff --git a/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java
new file mode 100644
index 0000000..d3587f1
--- /dev/null
+++ b/sample-services/ics-producer-consumer/producer/src/test/java/com/demo/producer/ThreadsControllerTest.java
@@ -0,0 +1,89 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package com.demo.producer;
+
+import com.demo.producer.controllers.ThreadsController;
+import com.demo.producer.producer.SimpleProducer;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.mock.web.MockHttpServletRequest;
+import org.springframework.mock.web.MockHttpServletResponse;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.web.context.request.RequestContextHolder;
+import org.springframework.web.context.request.ServletRequestAttributes;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.*;
+
+@SpringBootTest
+@ContextConfiguration(classes = SimpleProducer.class)
+public class ThreadsControllerTest {
+
+ @Mock
+ @Autowired
+ private SimpleProducer simpleProducer;
+
+ @InjectMocks
+ private ThreadsController threadsController;
+
+ private MockHttpServletRequest request;
+ private MockHttpServletResponse response;
+
+ private AutoCloseable closable;
+
+ @BeforeEach
+ public void setUp() {
+ closable = MockitoAnnotations.openMocks(this);
+ request = new MockHttpServletRequest();
+ response = new MockHttpServletResponse();
+ RequestContextHolder.setRequestAttributes(new ServletRequestAttributes(request, response));
+ }
+
+ @AfterEach
+ public void close() throws Exception {
+ closable.close();
+ }
+
+ @Test
+ public void testStopProducerWhenNoActiveProducer() throws Exception {
+ String result = threadsController.stopProducer();
+ assertEquals("No active producer to stop", result);
+ verify(simpleProducer, never()).shutdown();
+ }
+
+ @Test
+ public void testPublishNMessage() throws Exception {
+ String topicName = "testTopic";
+ int numberOfMessages = 10;
+ CompletableFuture<String> future = CompletableFuture.completedFuture("Producer started for topic: " + topicName);
+ CompletableFuture<String> result = threadsController.publishNMessage(numberOfMessages, topicName);
+ assertEquals(future.getClass(), result.getClass());
+ }
+
+}
diff --git a/sample-services/ics-producer-consumer/red.sh b/sample-services/ics-producer-consumer/red.sh
new file mode 100644
index 0000000..12085fe
--- /dev/null
+++ b/sample-services/ics-producer-consumer/red.sh
@@ -0,0 +1,189 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+#!/bin/bash
+skip_build=false
+no_console=false
+
+# Parse command line arguments
+while [[ $# -gt 0 ]]; do
+ case $1 in
+ --skip-build)
+ skip_build=true
+ shift
+ ;;
+ --no-console)
+ no_console=true
+ shift
+ ;;
+ *)
+ echo "Unknown option: $1"
+ exit 1
+ ;;
+ esac
+done
+# Source the utils script
+source utils.sh
+
+# Check Prerequisites
+checkJava
+checkDocker
+checkDockerCompose
+
+if ! $skip_build; then
+ # Make build the demo docker image
+ cd ./producer/
+ make build
+ cd ../consumer/
+ make build
+ cd ..
+fi
+
+# Start the Docker containers in detached mode
+docker-compose up -d
+
+# Wait for the Kafka container to be running
+wait_for_container "kafka-zkless" "Kafka Server started"
+space
+
+if ! $no_console; then
+ echo "Start RedPanda Console"
+ docker-compose -f docker-composeRedPanda.yaml up -d
+ space
+
+ echo "Start NONRTRIC control panel"
+ docker-compose -f ./docker-compose/docker-compose.yaml -f ./docker-compose/control-panel/docker-compose.yaml -f ./docker-compose/nonrtric-gateway/docker-compose.yaml up -d
+ space
+fi
+
+# Once Kafka container is running, start the producers and consumers
+echo "Kafka container is up and running. Starting producer and consumer..."
+space
+
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
+echo "Sending type1 to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-producer/v1/info-types/type1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_job_data_schema": {
+ "$schema":"http://json-schema.org/draft-07/schema#",
+ "title":"STD_Type1_1.0.0",
+ "description":"Type 1",
+ "type":"object"
+ }
+}'
+
+echo "Getting types from ICS"
+curl -X 'GET' 'http://localhost:8083/data-producer/v1/info-types/type1'
+space
+
+echo "Sending Producer infos to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-producer/v1/info-producers/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_producer_supervision_callback_url": "http://kafka-producer:8080/producer/supervision",
+ "supported_info_types": [
+ "type1"
+ ],
+ "info_job_callback_url": "http://kafka-producer:8080/producer/job"
+}'
+
+echo "Getting Producers Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-producer/v1/info-producers/1'
+space
+
+echo "Sending Consumer Job infos to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-consumer/v1/info-jobs/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_type_id": "type1",
+ "job_owner": "demo",
+ "job_definition": {
+ "deliveryInfo": {
+ "topic": "mytopic",
+ "bootStrapServers": "http://kafka-zkless:9092",
+ "numberOfMessages": 0
+ }
+ },
+ "job_result_uri": "http://kafka-producer:8080/producer/job",
+ "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+echo "Getting Consumer Job Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-consumer/v1/info-jobs/1'
+space
+
+echo "Sending Consumer Subscription Job infos to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "status_result_uri": "http://kafka-consumer:8081/info-type-status",
+ "owner": "owner"
+}'
+echo "Getting Consumer Subscription Job infos from ICS"
+curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
+space
+
+sleep 5
+echo "ICS Producer Docker logs "
+docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
+space
+echo "Demo Producer Docker logs "
+docker logs kafka-producer | grep c.d.p.p.SimpleProducer
+space
+echo "Demo Consumer Docker logs "
+docker logs kafka-consumer | grep c.d.c.c.SimpleConsumer
+space
+
+if ! $no_console; then
+ echo "Red Panda Console: http://localhost:8888"
+ echo "Control Panel Console: http://localhost:8181"
+fi
+
+echo "Done."
+
+containers=("kafka-producer" "kafka-consumer")
+
+for container in "${containers[@]}"; do
+ if docker logs "$container" | grep -q ERROR; then
+ echo "Errors found in logs of $container"
+ echo "FAIL"
+ exit 1
+ else
+ echo "No errors found in logs of $container"
+ fi
+done
+echo "SUCCESS"
+exit 0
diff --git a/sample-services/ics-producer-consumer/runconsumer.sh b/sample-services/ics-producer-consumer/runconsumer.sh
new file mode 100644
index 0000000..9ef776a
--- /dev/null
+++ b/sample-services/ics-producer-consumer/runconsumer.sh
@@ -0,0 +1,2 @@
+#!/bin/bash
+curl -X GET http://localhost:8081/startConsumer/mytopic
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/runproducer.sh b/sample-services/ics-producer-consumer/runproducer.sh
new file mode 100644
index 0000000..f9f5a7e
--- /dev/null
+++ b/sample-services/ics-producer-consumer/runproducer.sh
@@ -0,0 +1,2 @@
+#!/bin/bash
+curl -X GET http://localhost:8080/startProducer/mytopic
\ No newline at end of file
diff --git a/sample-services/ics-producer-consumer/start.sh b/sample-services/ics-producer-consumer/start.sh
new file mode 100755
index 0000000..18317a0
--- /dev/null
+++ b/sample-services/ics-producer-consumer/start.sh
@@ -0,0 +1,180 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#!/bin/bash
+source utils.sh
+
+PREFIX="nexus3.o-ran-sc.org:10004"
+VERSION="0.0.1"
+
+# Create a network for Kafka Containers
+docker network create kafka-net
+
+# Start Kafka
+docker run -d \
+ --network kafka-net \
+ --name kafka-zkless \
+ -p 9092:9092 \
+ -e LOG_DIR="/tmp/logs" \
+ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" \
+ -e KAFKA_LISTENERS="PLAINTEXT://:29092,PLAINTEXT_HOST://:9092,CONTROLLER://:9093" \
+ -e KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://kafka-zkless:29092,PLAINTEXT_HOST://kafka-zkless:9092" \
+ quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 \
+ /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && \
+ bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && \
+ bin/kafka-server-start.sh config/kraft/server.properties --override advertised.listeners=$KAFKA_ADVERTISED_LISTENERS --override listener.security.protocol.map=$KAFKA_LISTENER_SECURITY_PROTOCOL_MAP --override listeners=$KAFKA_LISTENERS'
+
+# Start ICS
+docker run -d \
+ --network kafka-net \
+ --name informationcoordinatorservice \
+ -p 8083:8083 \
+ -v ./application.yaml:/opt/app/information-coordinator-service/config/application.yaml \
+ nexus3.o-ran-sc.org:10001/o-ran-sc/nonrtric-plt-informationcoordinatorservice:1.6.0
+
+# Start Producer
+docker run -d \
+ --network kafka-net \
+ --name kafka-producer \
+ -p 8080:8080 \
+ -e KAFKA_SERVERS=kafka-zkless:9092 \
+ $PREFIX/o-ran-sc/nonrtric-sample-icsproducer:$VERSION
+
+#Start Consumer
+docker run -d \
+ --network kafka-net \
+ --name kafka-consumer \
+ -p 8081:8081 \
+ -e KAFKA_SERVERS=kafka-zkless:9092 \
+ $PREFIX/o-ran-sc/nonrtric-sample-icsconsumer:$VERSION
+
+# Wait for the Kafka container to be running
+wait_for_container "kafka-zkless" "Kafka Server started"
+wait_for_container "kafka-producer" "Started Application"
+wait_for_container "kafka-consumer" "Started Application"
+
+# Once Kafka container is running, start the producers and consumers
+echo "Kafka container is up and running. Starting producer and consumer..."
+space
+
+echo "Start 1 Producer on mytopic"
+curl -X GET http://localhost:8080/startProducer/mytopic
+space
+
+echo "Start 1 Consumer on mytopic"
+curl -X GET http://localhost:8081/startConsumer/mytopic
+space
+
+sleep 10
+
+echo "Sending type1 to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-producer/v1/info-types/type1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_job_data_schema": {
+ "$schema":"http://json-schema.org/draft-07/schema#",
+ "title":"STD_Type1_1.0.0",
+ "description":"Type 1",
+ "type":"object"
+ }
+}'
+
+echo "Getting types from ICS"
+curl -X 'GET' 'http://localhost:8083/data-producer/v1/info-types/type1'
+space
+
+echo "Sending Producer infos to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-producer/v1/info-producers/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_producer_supervision_callback_url": "http://kafka-producer:8080/producer/supervision",
+ "supported_info_types": [
+ "type1"
+ ],
+ "info_job_callback_url": "http://kafka-producer:8080/producer/job"
+}'
+
+echo "Getting Producers Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-producer/v1/info-producers/1'
+space
+
+echo "Sending Consumer Job infos to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-consumer/v1/info-jobs/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "info_type_id": "type1",
+ "job_owner": "demo",
+ "job_definition": {
+ "deliveryInfo": {
+ "topic": "mytopic",
+ "bootStrapServers": "http://kafka-zkless:9092",
+ "numberOfMessages": 0
+ }
+ },
+ "job_result_uri": "http://kafka-producer:8080/producer/job",
+ "status_notification_uri": "http://kafka-producer:8080/producer/supervision"
+}'
+
+echo "Getting Consumer Job Infos from ICS"
+curl -H 'Content-Type: application/json' 'http://localhost:8083/data-consumer/v1/info-jobs/1'
+space
+
+echo "Sending Consumer Subscription Job infos to ICS"
+curl -X 'PUT' \
+ 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' \
+ -H 'accept: application/json' \
+ -H 'Content-Type: application/json' \
+ -d '{
+ "status_result_uri": "http://kafka-consumer:8081/info-type-status",
+ "owner": "owner"
+}'
+echo "Getting Consumer Subscription Job infos from ICS"
+curl -X 'GET' 'http://localhost:8083/data-consumer/v1/info-type-subscription/1' -H 'accept: application/json'
+space
+
+sleep 5
+echo "ICS Producer Docker logs "
+docker logs informationcoordinatorservice | grep -E 'o.o.i.c.r1producer.ProducerCallbacks|o.o.i.repository.InfoTypeSubscriptions'
+space
+echo "Demo Producer Docker logs "
+docker logs kafka-producer | grep c.d.p.p.SimpleProducer
+space
+echo "Demo Consumer Docker logs "
+docker logs kafka-consumer | grep c.d.c.c.SimpleConsumer
+space
+
+echo "Done."
+
+containers=("kafka-producer" "kafka-consumer")
+
+for container in "${containers[@]}"; do
+ if docker logs "$container" | grep -q ERROR; then
+ echo "Errors found in logs of $container"
+ echo "FAIL"
+ exit 1
+ else
+ echo "No errors found in logs of $container"
+ fi
+done
+echo "SUCCESS"
+exit 0
diff --git a/sample-services/ics-producer-consumer/stop.sh b/sample-services/ics-producer-consumer/stop.sh
new file mode 100644
index 0000000..60fba31
--- /dev/null
+++ b/sample-services/ics-producer-consumer/stop.sh
@@ -0,0 +1,27 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+#!/bin/bash
+docker-compose down
+
+docker-compose -f docker-composeRedPanda.yaml down
+
+docker-compose -f ./docker-compose/docker-compose.yaml \
+ -f ./docker-compose/control-panel/docker-compose.yaml \
+ -f ./docker-compose/nonrtric-gateway/docker-compose.yaml \
+ down
diff --git a/sample-services/ics-producer-consumer/utils.sh b/sample-services/ics-producer-consumer/utils.sh
new file mode 100644
index 0000000..68d19ec
--- /dev/null
+++ b/sample-services/ics-producer-consumer/utils.sh
@@ -0,0 +1,98 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+#!/bin/bash
+
+checkJava() {
+ if ! command -v java >/dev/null 2>&1; then
+ echo "Java is not installed. Please install Java."
+ echo "Suggested fix for ubuntu:"
+ echo "sudo apt install default-jdk"
+ exit 1
+ else
+ echo "Java is installed."
+ fi
+}
+
+checkMaven() {
+ if mvn -v >/dev/null 2>&1; then
+ echo "Maven is installed."
+ else
+ echo "Maven is not installed. Please install Maven."
+ echo "Suggested fix for ubuntu:"
+ echo "sudo apt install maven"
+ exit 1
+ fi
+}
+
+checkDocker() {
+ if ! docker -v > /dev/null 2>&1; then
+ echo "Docker is not installed. Please install Docker."
+ echo "Suggested fix for ubuntu:"
+ echo "sudo apt-get update"
+ echo "sudo apt-get install -y apt-transport-https ca-certificates curl gnupg lsb-release"
+ echo "curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg"
+ echo "echo \"deb [arch=\$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \$(lsb_release -cs) stable\" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null"
+ echo "sudo apt-get update"
+ echo "sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin"
+ echo "sudo usermod -aG docker \$USER"
+ echo "newgrp docker"
+ exit 1
+ else
+ echo "Docker is installed."
+ fi
+}
+
+checkDockerCompose() {
+ if ! docker-compose -v > /dev/null 2>&1; then
+ echo "docker-compose is not installed. Please install docker-compose"
+ echo "Suggested fix for ubuntu:"
+ echo "sudo apt-get install docker-compose-plugin"
+ exit 1
+ else
+ echo "docker-compose is installed."
+ fi
+}
+
+# Function to wait for a Docker container to be running and log a specific string
+wait_for_container() {
+ local container_name="$1"
+ local log_string="$2"
+
+ while ! docker inspect "$container_name" &>/dev/null; do
+ echo "Waiting for container '$container_name' to be created..."
+ sleep 5
+ done
+
+ while [ "$(docker inspect -f '{{.State.Status}}' "$container_name")" != "running" ]; do
+ echo "Waiting for container '$container_name' to be running..."
+ sleep 5
+ done
+
+ # Check container logs for the specified string
+ while ! docker logs "$container_name" 2>&1 | grep "$log_string"; do
+ echo "Waiting for '$log_string' in container logs of '$container_name'..."
+ sleep 5
+ done
+}
+
+space() {
+ echo ""
+ echo "++++++++++++++++++++++++++++++++++++++++++++++++++++"
+ echo ""
+}