Merge "Improve performance of updateDataLeaves"
diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml
index 19ef988..608141e 100644
--- a/cps-ncmp-service/pom.xml
+++ b/cps-ncmp-service/pom.xml
@@ -104,6 +104,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>kafka</artifactId>
             <scope>test</scope>
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
index 9e2b66a..76cc0c4 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/DataOperationRecordFilterStrategy.java
@@ -20,9 +20,9 @@
 
 package org.onap.cps.ncmp.api.impl.async;
 
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
@@ -32,6 +32,7 @@
  *
  */
 @Configuration
+@Slf4j
 public class DataOperationRecordFilterStrategy {
 
     /**
@@ -41,15 +42,14 @@
      * @return boolean value.
      */
     @Bean
-    public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
+    public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
         return consumedRecord -> {
-            final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
-            if (eventTypeHeader == null) {
-                return false;
+            final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(consumedRecord.headers(), "ce_type");
+            if (eventTypeHeaderValue == null) {
+                log.trace("No ce_type header found, possibly a legacy event (ignored)");
+                return true;
             }
-            final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
-            return !(eventTypeHeaderValue != null
-                    && eventTypeHeaderValue.contains("DataOperationEvent"));
+            return !(eventTypeHeaderValue.contains("DataOperationEvent"));
         };
     }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
index 995a4d5..4a0ec5c 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumer.java
@@ -20,12 +20,12 @@
 
 package org.onap.cps.ncmp.api.impl.async;
 
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
@@ -39,7 +39,7 @@
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class NcmpAsyncDataOperationEventConsumer {
 
-    private final EventsPublisher<DataOperationEvent> eventsPublisher;
+    private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
      * Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
@@ -52,14 +52,12 @@
             filter = "includeDataOperationEventsOnly",
             groupId = "ncmp-data-operation-event-group",
             properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
-    public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
-                                              dataOperationEventConsumerRecord) {
+    public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
         log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
-        final String eventTarget = SerializationUtils
-                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
-        final String eventId = SerializationUtils
-                .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
-        eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
-                dataOperationEventConsumerRecord.value());
+        final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
+                dataOperationEventConsumerRecord.headers(), "ce_destination");
+        final String eventId = KafkaHeaders.getParsedKafkaHeader(
+                dataOperationEventConsumerRecord.headers(), "ce_id");
+        eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value());
     }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index e61e772..05c731d 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -71,7 +71,7 @@
      * @param event     message payload
      * @deprecated This method is not needed anymore since the use of headers will be in place.
      */
-    @Deprecated
+    @Deprecated(forRemoval = true)
     public void publishEvent(final String topicName, final String eventKey, final T event) {
         final ListenableFuture<SendResult<String, T>> eventFuture
                 = legacyKafkaEventTemplate.send(topicName, eventKey, event);
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
new file mode 100644
index 0000000..c0bdf3d
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerIntegrationSpec.groovy
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.async
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.kafka.CloudEventSerializer
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry
+import org.springframework.kafka.test.utils.ContainerTestUtils
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+import java.util.concurrent.TimeUnit
+
+@SpringBootTest(classes =[NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy])
+@DirtiesContext
+@Testcontainers
+@EnableAutoConfiguration
+class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec {
+
+    @SpringBean
+    EventsPublisher mockEventsPublisher = Mock()
+
+    @Autowired
+    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
+
+    @Value('${app.ncmp.async-m2m.topic}')
+    def topic
+
+    def setup() {
+        activateListeners()
+    }
+
+    def 'Filtering Cloud Events on Type.'() {
+        given: 'a cloud event of type: #eventType'
+            def cloudEvent = CloudEventBuilder.v1().withId('some id')
+                    .withType(eventType)
+                    .withSource(URI.create('some-source'))
+                    .build()
+        when: 'send the cloud event'
+            ProducerRecord<String, CloudEvent> record = new ProducerRecord<>(topic, cloudEvent)
+            KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
+            producer.send(record)
+        and: 'wait a little for async processing of message'
+            TimeUnit.MILLISECONDS.sleep(100)
+        then: 'the event has only been forwarded for the correct type'
+            expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(*_)
+        where: 'the following event types are used'
+            eventType                                        || expectedNUmberOfCallsToPublishForwardedEvent
+            'DataOperationEvent'                             || 1
+            'other type'                                     || 0
+            'any type contain the word "DataOperationEvent"' || 1
+    }
+
+    def 'Non cloud events on same Topic.'() {
+        when: 'sending a non-cloud event on the same topic'
+            ProducerRecord<String, String> record = new ProducerRecord<>(topic, 'simple string event')
+            KafkaProducer<String, String> producer = new KafkaProducer<>(eventProducerConfigProperties(StringSerializer))
+            producer.send(record)
+        and: 'wait a little for async processing of message'
+            TimeUnit.MILLISECONDS.sleep(100)
+        then: 'the event is not processed by this consumer'
+            0 * mockEventsPublisher.publishCloudEvent(*_)
+    }
+
+    def activateListeners() {
+        kafkaListenerEndpointRegistry.getListenerContainers().forEach(
+            messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
+        )
+    }
+
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
index d9b9ce6..7f8469a 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/NcmpAsyncDataOperationEventConsumerSpec.groovy
@@ -21,11 +21,16 @@
 package org.onap.cps.ncmp.api.impl.async
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.commons.lang3.SerializationUtils
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import io.cloudevents.kafka.impl.KafkaHeaders
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.header.internals.RecordHeaders
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
@@ -45,41 +50,56 @@
 class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
-    NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
+    NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
     @Autowired
-    RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
+    RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
 
-    def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
+    @Autowired
+    ObjectMapper objectMapper
+
+    def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
     def static clientTopic = 'client-topic'
     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
 
     def 'Consume and publish event to client specified topic'() {
         given: 'consumer subscribing to client topic'
-            legacyEventKafkaConsumer.subscribe([clientTopic])
+            cloudEventKafkaConsumer.subscribe([clientTopic])
         and: 'consumer record for data operation event'
             def consumerRecordIn = createConsumerRecord(dataOperationType)
         when: 'the data operation event is consumed and published to client specified topic'
-            asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
+            objectUnderTest.consumeAndPublish(consumerRecordIn)
         and: 'the client specified topic is polled'
-            def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
-        then: 'verifying consumed event operationID is same as published event operationID'
-            def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
-            def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
-            assert operationIdIn == operationIdOut
+            def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+        then: 'verify cloud compliant headers'
+            def consumerRecordOutHeaders = consumerRecordOut.headers()
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
+        and: 'verify that extension is included into header'
+            assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
+        and: 'map consumer record to expected event type'
+            def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
+                    PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+        and: 'verify published response data properties'
+            def response = dataOperationResponseEvent.data.responses[0]
+            response.operationId == 'some-operation-id'
+            response.statusCode == 'any-success-status-code'
+            response.statusMessage == 'Successfully applied changes'
+            response.responseContent as String == '[some-key:some-value]'
     }
 
     def 'Filter an event with type #eventType'() {
         given: 'consumer record for event with type #eventType'
             def consumerRecord = createConsumerRecord(eventType)
         when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
-            def result = recordFilterStrategy.filter(consumerRecord)
+            def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
         then: 'the event is #description'
             assert result == expectedResult
         where: 'filter the event based on the eventType #eventType'
@@ -90,14 +110,27 @@
 
     def createConsumerRecord(eventTypeAsString) {
         def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
-        def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
-        def eventTarget = SerializationUtils.serialize(clientTopic)
-        def eventType = SerializationUtils.serialize(eventTypeAsString)
-        def eventId = SerializationUtils.serialize('12345')
-        def consumerRecord = new ConsumerRecord<String, Object>(clientTopic, 1, 1L, '123', testEventSent)
-        consumerRecord.headers().add(new RecordHeader('eventId', eventId))
-        consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget))
-        consumerRecord.headers().add(new RecordHeader('eventType', eventType))
+        def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
+
+        CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
+
+        def headers = new RecordHeaders()
+        def cloudEventSerializer = new CloudEventSerializer()
+        cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
+
+        def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
+        headers.forEach(header -> consumerRecord.headers().add(header))
         return consumerRecord
     }
+
+    def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
+        return CloudEventBuilder.v1()
+                .withId("some-uuid")
+                .withType(eventTypeAsString)
+                .withSource(URI.create("sample-test-source"))
+                .withData(testEventSentAsBytes)
+                .withExtension("correlationid", "request-id")
+                .withExtension("destination", clientTopic)
+                .build();
+    }
 }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
index 4a9e3ee..5cc70e2 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy
@@ -39,7 +39,6 @@
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
-
 import java.time.Duration
 
 @SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
@@ -85,8 +84,8 @@
             assert records.size() == 1
         and: 'record can be converted to AVC event'
             def record = records.iterator().next()
-            def cloudevent = record.value() as CloudEvent
-            def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+            def cloudEvent = record.value() as CloudEvent
+            def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue()
         and: 'we have correct headers forwarded where correlation id matches'
             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
         and: 'event id differs(as per requirement) between consumed and forwarded'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionCreateProducerDemo.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionCreateProducerDemo.groovy
deleted file mode 100644
index 54a7ad3..0000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionCreateProducerDemo.groovy
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *  ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.events.avc
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.ncmp.event.model.SubscriptionEvent
-import org.onap.cps.ncmp.utils.KafkaDemoProducerConfig
-import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.utils.JsonObjectMapper
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.beans.factory.annotation.Value
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.test.annotation.DirtiesContext
-import spock.lang.Specification
-
-@SpringBootTest(classes = [KafkaDemoProducerConfig, ObjectMapper, JsonObjectMapper])
-@DirtiesContext
-class SubscriptionCreateProducerDemo extends Specification {
-
-    @Value('${app.ncmp.avc.subscription-topic}')
-    String subscriptionTopic;
-
-    @Autowired
-    KafkaTemplate<String, SubscriptionEvent> kafkaTemplate
-
-    @Autowired
-    JsonObjectMapper jsonObjectMapper
-
-    def 'produce subscription creation data event for testing'() {
-        given: 'avc subscription creation event data'
-            def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
-            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class)
-        and: 'test event is sent'
-            kafkaTemplate.send(subscriptionTopic, "request-Id-98765", testEventSent);
-        and: 'print json data to console'
-            println(jsonData);
-    }
-}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy
index 3ae6348..7bdf335 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CompositeStateSpec.groovy
@@ -22,7 +22,6 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import spock.lang.Specification
-
 import java.time.OffsetDateTime
 import java.time.ZoneOffset
 import java.time.format.DateTimeFormatter
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index 197bfda..df34f84 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -26,6 +26,8 @@
 
 app:
     ncmp:
+        async-m2m:
+            topic: ncmp-async-m2m
         avc:
             subscription-topic: cm-avc-subscription
             cm-events-topic: cm-events
diff --git a/cps-ncmp-service/src/test/resources/dataOperationEvent.json b/cps-ncmp-service/src/test/resources/dataOperationEvent.json
index 42268c0..0a32f38 100644
--- a/cps-ncmp-service/src/test/resources/dataOperationEvent.json
+++ b/cps-ncmp-service/src/test/resources/dataOperationEvent.json
@@ -1,30 +1,15 @@
 {
-  "data":{
-    "responses":[
+  "data": {
+    "responses": [
       {
-        "operationId":"1",
-        "ids":[
-          "123",
-          "124"
+        "operationId": "some-operation-id",
+        "ids": [
+          "cm-handle-id"
         ],
-        "statusCode":1,
-        "statusMessage":"Batch operation success on the above cmhandle ids ",
-        "responseContent":{
-          "ietf-netconf-monitoring:netconf-state":{
-            "schemas":{
-              "schema":[
-                {
-                  "identifier":"ietf-tls-server",
-                  "version":"2016-11-02",
-                  "format":"ietf-netconf-monitoring:yang",
-                  "namespace":"urn:ietf:params:xml:ns:yang:ietf-tls-server",
-                  "location":[
-                    "NETCONF"
-                  ]
-                }
-              ]
-            }
-          }
+        "statusCode": "any-success-status-code",
+        "statusMessage": "Successfully applied changes",
+        "responseContent": {
+          "some-key": "some-value"
         }
       }
     ]
diff --git a/docs/release-notes.rst b/docs/release-notes.rst
index 60050c1..e8eb018 100755
--- a/docs/release-notes.rst
+++ b/docs/release-notes.rst
@@ -1105,17 +1105,19 @@
 References
 ----------
 
-For more information on the ONAP Honolulu release, please see:
+For more information on the latest ONAP release, please see:
 
 #. `ONAP Home Page`_
-#. `ONAP Documentation`_
-#. `ONAP Release Downloads`_
 #. `ONAP Wiki Page`_
+#. `ONAP Documentation`_
+#. `ONAP CPS Documentation`_
+#. `ONAP Release Downloads`_
 
 
 .. _`ONAP Home Page`: https://www.onap.org
 .. _`ONAP Wiki Page`: https://wiki.onap.org
 .. _`ONAP Documentation`: https://docs.onap.org
+.. _`ONAP CPS Documentation`: https://docs.onap.org/projects/onap-cps
 .. _`ONAP Release Downloads`: https://git.onap.org
 
 Quick Links: