Patch #2 : Introduce kafka template for cloud events

 - Introduced a new cloud kafka template for cloud events that reads
   it's configuration from application.yml
 - Kept legacy kafka template for backward compatibility utill all cps
   events moved to cloud event comply.
 - Modified application.yml producer and consumer value deserializer properties to support cloud events.
 - Added new cloudevents-bom used into cps-ncmp-service pom.
 - For the time being we will have 2 kafkatemplates (legacyEventKafkaTemplate, cloudEventKafkaTemplate) into EventsPublisher until we fully move to
  cloudevents for all events. Once all cps events will be cloud event compy, we have TODO task where Deprecated: legacyKafkaEventTemplate will be removed with its
  java configuration file KafkaTemplateConfig.

Issue-ID: CPS-1724

Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Change-Id: I78c15bd480db063b89c6630c46c2d3a328b4fae4
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 802da9e..ed71339 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -75,7 +75,7 @@
         security:
             protocol: PLAINTEXT
         producer:
-            value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+            value-serializer: io.cloudevents.kafka.CloudEventSerializer
             client-id: cps-core
         consumer:
             group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group}
@@ -83,7 +83,7 @@
             value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
             properties:
                 spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
-                spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+                spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer
                 spring.json.use.type.headers: false
 
     jackson:
diff --git a/cps-dependencies/pom.xml b/cps-dependencies/pom.xml
index e06bbd7..f792c9c 100755
--- a/cps-dependencies/pom.xml
+++ b/cps-dependencies/pom.xml
@@ -147,6 +147,13 @@
                 <version>3.1</version>
             </dependency>
             <dependency>
+                <groupId>io.cloudevents</groupId>
+                <artifactId>cloudevents-bom</artifactId>
+                <version>2.5.0</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
                 <version>3.11</version>
diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml
index b87fe64..bbcb687 100644
--- a/cps-ncmp-service/pom.xml
+++ b/cps-ncmp-service/pom.xml
@@ -42,6 +42,18 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-json-jackson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-spring</artifactId>
+        </dependency>
+        <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>cps-service</artifactId>
         </dependency>
@@ -54,8 +66,8 @@
             <artifactId>cps-path-parser</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-web</artifactId>
+            <groupId>com.hazelcast</groupId>
+            <artifactId>hazelcast-spring</artifactId>
         </dependency>
         <dependency>
             <groupId>org.mapstruct</groupId>
@@ -66,8 +78,8 @@
             <artifactId>mapstruct-processor</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.hazelcast</groupId>
-            <artifactId>hazelcast-spring</artifactId>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-web</artifactId>
         </dependency>
         <!-- T E S T - D E P E N D E N C I E S -->
         <dependency>
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java
new file mode 100644
index 0000000..b76f86e
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfig.java
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.kafka;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+/**
+ * kafka Configuration for legacy and cloud events.
+ *
+ * @param <T> valid legacy event to be published over the wire.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+public class KafkaTemplateConfig<T> {
+
+    private final KafkaProperties kafkaProperties;
+
+    /**
+     * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
+     * application.yml and replaces value-serializer by JsonSerializer.
+     *
+     * @return legacy event producer instance.
+     */
+    @Bean
+    public ProducerFactory<String, T> legacyEventProducerFactory() {
+        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+        producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+        return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+    }
+
+    /**
+     * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
+     * into application.yml and replaces deserializer-value by JsonDeserializer.
+     *
+     * @return an instance of legacy consumer factory.
+     */
+    @Bean
+    public ConsumerFactory<String, T> legacyEventConsumerFactory() {
+        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+        consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
+        return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+    }
+
+    /**
+     * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
+     * application.yml with CloudEventSerializer.
+     *
+     * @return cloud event producer instance.
+     */
+    @Bean
+    public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
+        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+        return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+    }
+
+    /**
+     * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
+     * into application.yml having CloudEventDeserializer as deserializer-value.
+     *
+     * @return an instance of cloud consumer factory.
+     */
+    @Bean
+    public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
+        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+        return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+    }
+
+    /**
+     * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+     *
+     * @return an instance of legacy Kafka template.
+     */
+    @Bean
+    @Primary
+    public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+        final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+        kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+        return kafkaTemplate;
+    }
+
+    /**
+     * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
+     *
+     * @return an instance of cloud Kafka template.
+     */
+    @Bean
+    public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
+        final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+        kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
+        return kafkaTemplate;
+    }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index d92316d..7b28b4c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -20,6 +20,7 @@
 
 package org.onap.cps.ncmp.api.impl.events;
 
+import io.cloudevents.CloudEvent;
 import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -42,7 +43,12 @@
 @RequiredArgsConstructor
 public class EventsPublisher<T> {
 
-    private final KafkaTemplate<String, T> eventKafkaTemplate;
+    /** Once all cps events will be modified to cloud compliant, will remove legacyKafkaEventTemplate with
+     it's java configuration file KafkaTemplateConfig. **/
+    @Deprecated(forRemoval = true)
+    private final KafkaTemplate<String, T> legacyKafkaEventTemplate;
+
+    private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
 
     /**
      * Generic Event publisher.
@@ -54,7 +60,8 @@
      */
     @Deprecated
     public void publishEvent(final String topicName, final String eventKey, final T event) {
-        final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(topicName, eventKey, event);
+        final ListenableFuture<SendResult<String, T>> eventFuture
+                = legacyKafkaEventTemplate.send(topicName, eventKey, event);
         eventFuture.addCallback(handleCallback(topicName));
     }
 
@@ -70,7 +77,7 @@
 
         final ProducerRecord<String, T> producerRecord =
                 new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
-        final ListenableFuture<SendResult<String, T>> eventFuture = eventKafkaTemplate.send(producerRecord);
+        final ListenableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord);
         eventFuture.addCallback(handleCallback(topicName));
     }
 
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
index bcf75a2..fe7b3f1 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
@@ -22,6 +22,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.mapstruct.factory.Mappers
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -34,7 +35,6 @@
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
-
 import java.time.Duration
 
 @SpringBootTest(classes = [EventsPublisher, NcmpAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
@@ -44,7 +44,7 @@
 
     @SpringBean
     EventsPublisher cpsAsyncRequestResponseEventPublisher =
-        new EventsPublisher<NcmpAsyncRequestResponseEvent>(kafkaTemplate);
+        new EventsPublisher<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
 
 
     @SpringBean
@@ -59,18 +59,18 @@
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
+    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
 
     def 'Consume and forward valid message'() {
         given: 'consumer has a subscription'
-            kafkaConsumer.subscribe(['test-topic'] as List<String>)
+            legacyEventKafkaConsumer.subscribe(['test-topic'] as List<String>)
         and: 'an event is sent'
             def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class)
         when: 'the event is consumed'
             ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent)
         and: 'the topic is polled'
-            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+            def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'consumed forwarded event id is the same as sent event id'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
index 28464bb..02071cd 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncBatchEventConsumerSpec.groovy
@@ -25,6 +25,7 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.events.async.BatchDataResponseEventV1
@@ -46,7 +47,7 @@
 class NcmpAsyncBatchEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(kafkaTemplate)
+    EventsPublisher asyncBatchEventPublisher = new EventsPublisher<BatchDataResponseEventV1>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
     NcmpAsyncBatchEventConsumer asyncBatchEventConsumer = new NcmpAsyncBatchEventConsumer(asyncBatchEventPublisher)
@@ -57,19 +58,19 @@
     @Autowired
     RecordFilterStrategy<String, BatchDataResponseEventV1> recordFilterStrategy
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
+    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
     def static clientTopic = 'client-topic'
     def static batchEventType = 'org.onap.cps.ncmp.events.async.BatchDataResponseEventV1'
 
     def 'Consume and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
-            kafkaConsumer.subscribe([clientTopic])
+            legacyEventKafkaConsumer.subscribe([clientTopic])
         and: 'consumer record for batch event'
             def consumerRecordIn = createConsumerRecord(batchEventType)
         when: 'the batch event is consumed and published to client specified topic'
             asyncBatchEventConsumer.consumeAndPublish(consumerRecordIn)
         and: 'the client specified topic is polled'
-            def consumerRecordOut = kafkaConsumer.poll(Duration.ofMillis(1500))[0]
+            def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
         then: 'verifying consumed event operationID is same as published event operationID'
             def operationIdIn = consumerRecordIn.value.event.batchResponses[0].operationId
             def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), BatchDataResponseEventV1.class).event.batchResponses[0].operationId
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy
new file mode 100644
index 0000000..ed5f161
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaTemplateConfigSpec.groovy
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.kafka;
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import org.spockframework.spring.EnableSharedInjection
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.support.serializer.JsonSerializer
+import spock.lang.Shared
+import spock.lang.Specification
+
+@SpringBootTest(classes = [KafkaProperties, KafkaTemplateConfig])
+@EnableSharedInjection
+@EnableConfigurationProperties
+class KafkaTemplateConfigSpec extends Specification {
+
+    @Shared
+    @Autowired
+    KafkaTemplate<String, String> legacyEventKafkaTemplate
+
+    @Shared
+    @Autowired
+    KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
+
+    def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
+        expect: 'kafka template is instantiated'
+            assert kafkaTemplateInstance.properties['beanName'] == beanName
+        and: 'verify event key and value serializer'
+            assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName())
+        and: 'verify event key and value deserializer'
+            assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
+        where: 'the following event type is used'
+            eventType      | kafkaTemplateInstance    || beanName                   | valueSerializer      | delegateDeserializer
+            'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer       | JsonDeserializer
+            'cloud event'  | cloudEventKafkaTemplate  || 'cloudEventKafkaTemplate'  | CloudEventSerializer | CloudEventDeserializer
+    }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
index 5f54bbe..3dffac7 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
@@ -24,6 +24,7 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.header.internals.RecordHeader
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.mapstruct.factory.Mappers
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -48,7 +49,7 @@
     AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class)
 
     @SpringBean
-    EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(kafkaTemplate)
+    EventsPublisher eventsPublisher = new EventsPublisher<AvcEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
     AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper)
@@ -56,13 +57,13 @@
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
 
     def 'Consume and forward valid message'() {
         given: 'consumer has a subscription on a topic'
             def cmEventsTopicName = 'cm-events'
             acvEventConsumer.cmEventsTopicName = cmEventsTopicName
-            kafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
+            legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
         and: 'an event is sent'
             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
@@ -73,7 +74,7 @@
         when: 'the event is consumed'
             acvEventConsumer.consumeAndForward(consumerRecord)
         and: 'the topic is polled'
-            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+            def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'record can be converted to AVC event'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
index 9374126..4c68804 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
@@ -22,6 +22,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.events.lcm.v1.Event
@@ -42,12 +43,12 @@
 @DirtiesContext
 class LcmEventsPublisherSpec extends MessagingBaseSpec {
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
 
     def testTopic = 'ncmp-events-test'
 
     @SpringBean
-    EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(kafkaTemplate)
+    EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
@@ -82,11 +83,11 @@
                 eventSchema       : eventSchema,
                 eventSchemaVersion: eventSchemaVersion]
         and: 'consumer has a subscription'
-            kafkaConsumer.subscribe([testTopic] as List<String>)
+            legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
         when: 'an event is published'
             lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
         and: 'topic is polled'
-            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+            def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
         and: 'record key matches the expected event key'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
index 337178e..603b8cd 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/MessagingBaseSpec.groovy
@@ -20,6 +20,8 @@
 
 package org.onap.cps.ncmp.api.kafka
 
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventSerializer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.serialization.StringSerializer
 import org.spockframework.spring.SpringBean
@@ -44,30 +46,33 @@
 
     static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
 
-    def producerConfigProperties() {
+    @SpringBean
+    KafkaTemplate legacyEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(eventProducerConfigProperties(JsonSerializer)))
+
+    @SpringBean
+    KafkaTemplate cloudEventKafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<String, CloudEvent>(eventProducerConfigProperties(CloudEventSerializer)))
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+    }
+
+    def eventProducerConfigProperties(valueSerializer) {
         return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('retries')          : 0,
                 ('batch-size')       : 16384,
                 ('linger.ms')        : 1,
                 ('buffer.memory')    : 33554432,
                 ('key.serializer')   : StringSerializer,
-                ('value.serializer') : JsonSerializer]
+                ('value.serializer') : valueSerializer]
     }
 
-    def consumerConfigProperties(consumerGroupId) {
+    def eventConsumerConfigProperties(consumerGroupId, valueSerializer) {
         return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('key.deserializer')  : StringDeserializer,
-                ('value.deserializer'): StringDeserializer,
+                ('value.deserializer'): valueSerializer,
                 ('auto.offset.reset') : 'earliest',
                 ('group.id')          : consumerGroupId
         ]
     }
-
-    @SpringBean
-    KafkaTemplate kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
-
-    @DynamicPropertySource
-    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
-        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
-    }
 }
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index 1016f2b..197bfda 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -16,6 +16,14 @@
 #  SPDX-License-Identifier: Apache-2.0
 #  ============LICENSE_END=========================================================
 
+spring:
+    kafka:
+        producer:
+            value-serializer: io.cloudevents.kafka.CloudEventSerializer
+        consumer:
+            properties:
+                spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer
+
 app:
     ncmp:
         avc: