New simple kafka producer microservice
Using Spring kafka template in a simple reactive way
Issue-ID: NONRTRIC-1009
Change-Id: I5af9dfeb546531e173785b63c52c52240bbd938a
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/.gitignore b/sample-services/ics-simple-producer-consumer/kafka-producer/.gitignore
new file mode 100644
index 0000000..549e00a
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/.gitignore
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/Dockerfile b/sample-services/ics-simple-producer-consumer/kafka-producer/Dockerfile
new file mode 100644
index 0000000..c36376d
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/Dockerfile
@@ -0,0 +1,30 @@
+#==================================================================================
+# Copyright (C) 2024: OpenInfra Foundation Europe
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#==================================================================================
+FROM openjdk:17-jdk-slim
+
+EXPOSE 8080
+
+ARG SPRING_KAFKA_SERVER
+ENV SPRING_KAFKA_SERVER=${SPRING_KAFKA_SERVER}
+
+WORKDIR /app
+
+COPY target/kafka-producer-0.0.1-SNAPSHOT.jar /app/producer-0.0.1.jar
+
+CMD ["java", "-jar", "producer-0.0.1.jar"]
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/container-tag.yaml b/sample-services/ics-simple-producer-consumer/kafka-producer/container-tag.yaml
new file mode 100644
index 0000000..48290af
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/container-tag.yaml
@@ -0,0 +1,23 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+
+# The Jenkins job requires a tag to build the Docker image.
+# By default this file is in the docker build directory,
+# but the location can configured in the JJB template.
+---
+tag: 0.0.1
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/pom.xml b/sample-services/ics-simple-producer-consumer/kafka-producer/pom.xml
new file mode 100644
index 0000000..008484e
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.3.2</version>
+ <relativePath/>
+ </parent>
+ <groupId>com.demo</groupId>
+ <artifactId>kafka-producer</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>kafka-producer</name>
+ <description>Demo project for Spring Boot and Kafka producer</description>
+
+ <properties>
+ <java.version>17</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.45.0</version>
+ <inherited>false</inherited>
+ <executions>
+ <execution>
+ <id>generate-producer-image</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-simple-icsproducer:latest
+ </name>
+ <build>
+ <cleanup>try</cleanup>
+ <contextDir>${project.basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>latest</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ <execution>
+ <id>push-producer-image</id>
+ <goals>
+ <goal>build</goal>
+ <goal>push</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <pushRegistry>${env.CONTAINER_PUSH_REGISTRY}</pushRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-simple-icsproducer:latest
+ </name>
+ <build>
+ <contextDir>${project.basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>${project.version}</tag>
+ <tag>latest</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/KafkaProducerApplication.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/KafkaProducerApplication.java
new file mode 100644
index 0000000..4de2dc4
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/KafkaProducerApplication.java
@@ -0,0 +1,32 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaProducerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaProducerApplication.class, args);
+ }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/config/KafkaConfiguration.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/config/KafkaConfiguration.java
new file mode 100644
index 0000000..309bce3
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/config/KafkaConfiguration.java
@@ -0,0 +1,62 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.config;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableConfigurationProperties
+public class KafkaConfiguration {
+
+ @Value("${spring.kafka.server}")
+ private String server;
+
+ @Bean
+ public Map<String, Object> producerConfigs() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return props;
+ }
+
+ @Bean
+ public ProducerFactory<String, String> producerFactory() {
+ return new DefaultKafkaProducerFactory<>(producerConfigs());
+ }
+
+ @Bean
+ public KafkaTemplate<String, String> kafkaTemplate() {
+ return new KafkaTemplate<String, String>(producerFactory());
+ }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/CallbacksController.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/CallbacksController.java
new file mode 100644
index 0000000..6495b96
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/CallbacksController.java
@@ -0,0 +1,74 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.controller;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.client.RestTemplate;
+
+@EnableConfigurationProperties
+@RestController
+@RequestMapping(produces = "application/json")
+public class CallbacksController {
+ private final Logger logger = LoggerFactory.getLogger(CallbacksController.class);
+
+ private final RestTemplate restTemplate = new RestTemplate();
+
+ @Autowired
+ KafkaController kafkaController;
+
+ @GetMapping("/health-check") // defined in ICS ProducerSupervisionCallbackUrl
+ public ResponseEntity<String> getHealthCheck() {
+ logger.info("Post Info Type Status");
+ return new ResponseEntity<>("Ok", HttpStatus.OK);
+ }
+
+ @PostMapping("/info-job") // defined in ICS JobCallbackUrl
+ public ResponseEntity<String> startJob(@RequestBody String requestBody) { // defined in ICS CallbackBody
+ logger.info("Start Job");
+ kafkaController.postMessageMono(requestBody);
+ return new ResponseEntity<>("Ok", HttpStatus.OK);
+ }
+
+ @DeleteMapping("/info-job/{jobID}") // defined in ICS JobCallbackUrl
+ public ResponseEntity<String> stopJob() {
+ logger.info("Stop Job");
+ // Call the shutdown endpoint
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ HttpEntity<String> entity = new HttpEntity<String>(null, headers);
+ String shutdownUrl = "http://localhost:8080" + "/actuator/shutdown";
+ return restTemplate.postForEntity(shutdownUrl, entity, String.class);
+ //return new ResponseEntity<>("Ok", HttpStatus.OK);
+ }
+}
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/KafkaController.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/KafkaController.java
new file mode 100644
index 0000000..723592c
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/KafkaController.java
@@ -0,0 +1,56 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.controller;
+
+import java.lang.invoke.MethodHandles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import reactor.core.publisher.Mono;
+
+@EnableConfigurationProperties
+@RestController
+@RequestMapping()
+public class KafkaController {
+ private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Value("${spring.kafka.topic}")
+ private String topic;
+
+ @Autowired
+ private KafkaTemplate<String,String> kafkaTemplate;
+
+ @GetMapping("/publish/{name}")
+ public Mono<String> postMessageMono(@PathVariable("name") final String name) {
+ return Mono.fromFuture(kafkaTemplate.send(topic, name))
+ .doOnSuccess(result -> logger.info("Published: " + name))
+ .thenReturn("Message Published Successfully")
+ .doOnError(err -> logger.error("Unable to Publish: " + name));
+ }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/model/Job.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/model/Job.java
new file mode 100644
index 0000000..d640be0
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/model/Job.java
@@ -0,0 +1,29 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.model;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class Job {
+ private Object job;
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/service/JobService.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/service/JobService.java
new file mode 100644
index 0000000..80f69da
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/service/JobService.java
@@ -0,0 +1,36 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.stereotype.Service;
+import com.demo.kafkaproducer.model.Job;
+
+import lombok.Data;
+
+@Data
+@Service
+public class JobService {
+
+ public List<Job> jobList = new ArrayList<>();
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/resources/application.yaml b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/resources/application.yaml
new file mode 100644
index 0000000..5bc0e85
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/resources/application.yaml
@@ -0,0 +1,39 @@
+# ========================LICENSE_START=================================
+# O-RAN-SC
+#
+# Copyright (C) 2024: OpenInfra Foundation Europe
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+server:
+ port: 8080
+spring:
+ application:
+ name: producer
+ kafka:
+ server: ${SPRING_KAFKA_SERVER:localhost:9092}
+ topic: mytopic
+ group: group-1
+ producer:
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "info, health, shutdown"
+ endpoint:
+ shutdown:
+ enabled: true
+endpoints:
+ shutdown:
+ enabled: true
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/CallbacksControllerTest.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/CallbacksControllerTest.java
new file mode 100644
index 0000000..94a6ce9
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/CallbacksControllerTest.java
@@ -0,0 +1,92 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+import com.demo.kafkaproducer.controller.CallbacksController;
+import com.demo.kafkaproducer.controller.KafkaController;
+
+import reactor.core.publisher.Mono;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SpringBootTest
+public class CallbacksControllerTest {
+
+ @Mock
+ private RestTemplate restTemplate;
+
+ @Mock
+ private KafkaController kafkaController;
+
+
+ @InjectMocks
+ private CallbacksController callbacksController;
+
+ @BeforeEach
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test
+ public void testGetHealthCheck() {
+ ResponseEntity<String> response = callbacksController.getHealthCheck();
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("Ok");
+ }
+
+ @Test
+ public void testStartJob() {
+ String requestBody = "testJob";
+ when(kafkaController.postMessageMono(requestBody)).thenReturn(Mono.just("Message Published Successfully"));
+
+ ResponseEntity<String> response = callbacksController.startJob(requestBody);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("Ok");
+ verify(kafkaController, times(1)).postMessageMono(requestBody);
+ }
+
+ @Test
+ public void testActuatorStopJob() {
+ String shutdownUrl = "http://localhost:8080/actuator/shutdown";
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ HttpEntity<String> entity = new HttpEntity<>(null, headers);
+
+ ResponseEntity<String> restTemplateResponse = new ResponseEntity<>("Shutdown successful", HttpStatus.OK);
+ when(restTemplate.postForEntity(shutdownUrl, entity, String.class)).thenReturn(restTemplateResponse);
+
+ }
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerApplicationTests.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerApplicationTests.java
new file mode 100644
index 0000000..97b2775
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerApplicationTests.java
@@ -0,0 +1,32 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class KafkaProducerApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerControllerTest.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerControllerTest.java
new file mode 100644
index 0000000..82201a1
--- /dev/null
+++ b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerControllerTest.java
@@ -0,0 +1,97 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import com.demo.kafkaproducer.controller.KafkaController;
+
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class KafkaProducerControllerTest {
+
+ @Mock
+ private KafkaTemplate<String, String> kafkaTemplate;
+
+ @InjectMocks
+ private KafkaController kafkaController;
+
+ private String topic = "test-topic";
+
+ @BeforeEach
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ // Set the value of the topic field (since @Value won't be processed in unit tests)
+ ReflectionTestUtils.setField(kafkaController, "topic", topic);
+ }
+
+ @Test
+ public void testPostMessageMono_Success() {
+ // Arrange
+ String name = "testMessage";
+ CompletableFuture<SendResult<String, String>> future = CompletableFuture.completedFuture(null);
+
+ when(kafkaTemplate.send(topic, name)).thenReturn(future);
+
+ // Act
+ Mono<String> result = kafkaController.postMessageMono(name);
+
+ // Assert
+ StepVerifier.create(result)
+ .expectNext("Message Published Successfully")
+ .verifyComplete();
+
+ verify(kafkaTemplate, times(1)).send(topic, name);
+ }
+
+ @Test
+ public void testPostMessageMono_Failure() {
+ // Arrange
+ String name = "testMessage";
+ CompletableFuture<SendResult<String, String>> future = CompletableFuture.failedFuture(new RuntimeException("Kafka error"));
+
+ when(kafkaTemplate.send(topic, name)).thenReturn(future);
+
+ // Act
+ Mono<String> result = kafkaController.postMessageMono(name);
+
+ // Assert
+ StepVerifier.create(result)
+ .expectError(RuntimeException.class)
+ .verify();
+
+ verify(kafkaTemplate, times(1)).send(topic, name);
+ }
+
+}