Read and send Response (DMI <> NCMP) for CmSubscription

-Update subscription code to new models

Issue-ID: CPS-1971
Change-Id: I382ca31407f8088ddea889a7ab904a22c09789ff
Signed-off-by: seanbeirne <sean.beirne@est.tech>
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/exception/CloudEventConstructionException.java b/src/main/java/org/onap/cps/ncmp/dmi/exception/CloudEventConstructionException.java
new file mode 100644
index 0000000..f61c156
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/exception/CloudEventConstructionException.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.exception;
+
+public class CloudEventConstructionException extends DmiException {
+
+    private static final long serialVersionUID = 7747941311132087621L;
+
+    /**
+     * CloudEventConstructionException.
+     *
+     * @param message the error message
+     * @param details the error details
+     * @param cause   the error cause
+     */
+    public CloudEventConstructionException(final String message, final String details, final Throwable cause) {
+        super(message, details, cause);
+    }
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiInEventConsumer.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiInEventConsumer.java
new file mode 100644
index 0000000..ecfef6f
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiInEventConsumer.java
@@ -0,0 +1,108 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.cmsubscription;
+
+import io.cloudevents.CloudEvent;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus;
+import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper;
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class CmNotificationSubscriptionDmiInEventConsumer {
+
+
+    @Value("${app.dmi.avc.subscription-response-topic}")
+    private String cmNotificationSubscriptionResponseTopic;
+    @Value("${dmi.service.name}")
+    private String dmiName;
+    private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+
+    /**
+     * Consume the cmNotificationSubscriptionDmiInCloudEvent event.
+     *
+     * @param cmNotificationSubscriptionDmiInCloudEvent the event to be consumed
+     */
+    @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
+        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+    public void consumeCmNotificationSubscriptionDmiInEvent(
+        final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiInCloudEvent) {
+        final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
+            CloudEventMapper.toTargetEvent(cmNotificationSubscriptionDmiInCloudEvent.value(),
+                CmNotificationSubscriptionDmiInEvent.class);
+        if (cmNotificationSubscriptionDmiInEvent != null) {
+            final String subscriptionId = cmNotificationSubscriptionDmiInCloudEvent.value().getId();
+            final String subscriptionType = cmNotificationSubscriptionDmiInCloudEvent.value().getType();
+            final String correlationId = String.valueOf(cmNotificationSubscriptionDmiInCloudEvent.value()
+                .getExtension("correlationid"));
+
+            if ("subscriptionCreated".equals(subscriptionType)) {
+                createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionCreateResponse",
+                    correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
+            } else if ("subscriptionDeleted".equals(subscriptionType)) {
+                createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionDeleteResponse",
+                    correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
+            }
+        }
+    }
+
+    /**
+     * Create Dmi out event object and send to response topic.
+     *
+     * @param eventKey the events key
+     * @param subscriptionType the subscriptions type
+     * @param correlationId the events correlation Id
+     * @param cmNotificationSubscriptionStatus subscriptions status accepted/rejected
+     */
+    public void createAndSendCmNotificationSubscriptionDmiOutEvent(
+        final String eventKey, final String subscriptionType, final String correlationId,
+        final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
+
+        final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent =
+            new CmNotificationSubscriptionDmiOutEvent();
+        final Data cmNotificationSubscriptionDmiOutEventData = new Data();
+
+        if (cmNotificationSubscriptionStatus.equals(CmNotificationSubscriptionStatus.ACCEPTED)) {
+            cmNotificationSubscriptionDmiOutEventData.setStatusCode("1");
+            cmNotificationSubscriptionDmiOutEventData.setStatusMessage("ACCEPTED");
+        } else if (cmNotificationSubscriptionStatus.equals(CmNotificationSubscriptionStatus.REJECTED)) {
+            cmNotificationSubscriptionDmiOutEventData.setStatusCode("2");
+            cmNotificationSubscriptionDmiOutEventData.setStatusMessage("REJECTED");
+        }
+        cmNotificationSubscriptionDmiOutEvent.setData(cmNotificationSubscriptionDmiOutEventData);
+
+        cloudEventKafkaTemplate.send(cmNotificationSubscriptionResponseTopic, eventKey,
+            CmNotificationSubscriptionDmiOutEventToCloudEventMapper.toCloudEvent(cmNotificationSubscriptionDmiOutEvent,
+                subscriptionType, dmiName, correlationId));
+
+    }
+
+
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiOutEventToCloudEventMapper.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiOutEventToCloudEventMapper.java
new file mode 100644
index 0000000..51205da
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiOutEventToCloudEventMapper.java
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.cmsubscription;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.util.UUID;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.onap.cps.ncmp.dmi.exception.CloudEventConstructionException;
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CmNotificationSubscriptionDmiOutEventToCloudEventMapper {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Maps SubscriptionEventResponse to a CloudEvent.
+     *
+     * @param cmSubscriptionDmiOutEvent object.
+     * @param subscriptionType          String of subscription type.
+     * @param dmiName                   String of dmiName.
+     * @param correlationId             String of correlationId.
+     * @return CloudEvent built.
+     */
+    public static CloudEvent toCloudEvent(final CmNotificationSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent,
+                                          final String subscriptionType, final String dmiName,
+                                          final String correlationId) {
+        try {
+            return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create(dmiName))
+                .withType(subscriptionType)
+                .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
+                .withExtension("correlationid", correlationId)
+                .withData(objectMapper.writeValueAsBytes(cmSubscriptionDmiOutEvent)).build();
+        } catch (final Exception ex) {
+            throw new CloudEventConstructionException("The Cloud Event could not be constructed",
+                "Invalid object passed", ex);
+        }
+    }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/model/CmNotificationSubscriptionStatus.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/model/CmNotificationSubscriptionStatus.java
new file mode 100644
index 0000000..40b1297
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/cmsubscription/model/CmNotificationSubscriptionStatus.java
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.cmsubscription.model;
+
+public enum CmNotificationSubscriptionStatus {
+
+    ACCEPTED("ACCEPTED"), REJECTED("REJECTED");
+
+    private final String cmNotificationSubscriptionStatusValue;
+
+    CmNotificationSubscriptionStatus(final String cmNotificationSubscriptionStatusValue) {
+        this.cmNotificationSubscriptionStatusValue = cmNotificationSubscriptionStatusValue;
+    }
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/mapper/CloudEventMapper.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/mapper/CloudEventMapper.java
new file mode 100644
index 0000000..8f196cf
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/mapper/CloudEventMapper.java
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.mapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.data.PojoCloudEventData;
+import io.cloudevents.jackson.PojoCloudEventDataMapper;
+import io.cloudevents.rw.CloudEventRWException;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CloudEventMapper {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Generic method to map cloud event data to target event class object.
+     *
+     * @param cloudEvent       input cloud event
+     * @param targetEventClass target event class
+     * @param <T>              target event class type
+     * @return mapped target event
+     */
+    public static <T> T toTargetEvent(final CloudEvent cloudEvent, final Class<T> targetEventClass) {
+        PojoCloudEventData<T> mappedCloudEvent = null;
+
+        try {
+            mappedCloudEvent =
+                CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, targetEventClass));
+
+        } catch (final CloudEventRWException cloudEventRwException) {
+            log.error("Unable to map cloud event to target event class type : {} with cause : {}", targetEventClass,
+                cloudEventRwException.getMessage());
+        }
+
+        return mappedCloudEvent == null ? null : mappedCloudEvent.getValue();
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiInEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiInEventConsumerSpec.groovy
new file mode 100644
index 0000000..4795343
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiInEventConsumerSpec.groovy
@@ -0,0 +1,144 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.cmsubscription
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.dmi.TestUtils
+import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus
+import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent
+import org.slf4j.LoggerFactory
+import org.spockframework.spring.SpringBean
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+
+import java.sql.Timestamp
+import java.time.Duration
+import java.time.OffsetDateTime
+import java.time.ZoneId
+
+
+@SpringBootTest(classes = [CmNotificationSubscriptionDmiInEventConsumer])
+@Testcontainers
+@DirtiesContext
+class CmNotificationSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {
+    def objectMapper = new ObjectMapper()
+    def testTopic = 'dmi-ncmp-cm-avc-subscription'
+
+    @SpringBean
+    CmNotificationSubscriptionDmiInEventConsumer objectUnderTest = new CmNotificationSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)
+
+    def logger = Spy(ListAppender<ILoggingEvent>)
+
+    void setup() {
+        ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).addAppender(logger)
+        logger.start()
+    }
+
+    void cleanup() {
+        ((Logger) LoggerFactory.getLogger(CloudEventMapper.class)).detachAndStopAllAppenders()
+    }
+
+    def 'Sends subscription cloud event response successfully.'() {
+        given: 'an subscription event response'
+            objectUnderTest.dmiName = 'test-ncmp-dmi'
+            objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
+            def correlationId = 'test-subscriptionId#test-ncmp-dmi'
+            def cmSubscriptionDmiOutEventData = new Data(statusCode: '1', statusMessage: 'ACCEPTED')
+            def subscriptionEventResponse =
+                    new CmNotificationSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
+        and: 'consumer has a subscription'
+            kafkaConsumer.subscribe([testTopic] as List<String>)
+        when: 'an event is published'
+            def eventKey = UUID.randomUUID().toString()
+            objectUnderTest.createAndSendCmNotificationSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus", correlationId, CmNotificationSubscriptionStatus.ACCEPTED)
+        and: 'topic is polled'
+            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+        then: 'poll returns one record'
+            assert records.size() == 1
+            def record = records.iterator().next()
+        and: 'the record value matches the expected event value'
+            def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
+            assert expectedValue == record.value
+            assert eventKey == record.key
+    }
+
+    def 'Consume valid message.'() {
+        given: 'an event'
+            objectUnderTest.dmiName = 'test-ncmp-dmi'
+            def eventKey = UUID.randomUUID().toString()
+            def timestamp = new Timestamp(1679521929511)
+            def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
+            def subscriptionEvent = objectMapper.readValue(jsonData, CmNotificationSubscriptionDmiInEvent.class)
+            objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
+            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                    .withType(subscriptionType)
+                    .withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
+                    .withExtension("correlationid", eventKey)
+                    .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
+                    .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
+            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
+        when: 'the valid event is consumed'
+            objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
+        then: 'no exception is thrown'
+            noExceptionThrown()
+        where: 'given #scenario'
+            scenario                    | subscriptionType
+            'Subscription Create Event' | "subscriptionCreated"
+            'Subscription Delete Event' | "subscriptionDeleted"
+    }
+
+    def 'Consume invalid message.'() {
+        given: 'an invalid event body'
+            objectUnderTest.dmiName = 'test-ncmp-dmi'
+            def eventKey = UUID.randomUUID().toString()
+            def timestamp = new Timestamp(1679521929511)
+            def invalidJsonBody = "/////"
+            objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
+            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                    .withType("subscriptionCreated")
+                    .withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))
+                    .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
+                    .withExtension("correlationid", eventKey).withData(objectMapper.writeValueAsBytes(invalidJsonBody)).build()
+            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
+        when: 'the invalid event is consumed'
+            objectUnderTest.consumeCmNotificationSubscriptionDmiInEvent(testEventSent)
+        then: 'exception is thrown and event is logged'
+            def loggingEvent = getLoggingEvent()
+            assert loggingEvent.level == Level.ERROR
+            assert loggingEvent.formattedMessage.contains('Unable to map cloud event to target event class type')
+    }
+
+    def getLoggingEvent() {
+        return logger.list[0]
+    }
+}
\ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiOutEventToCloudEventMapperSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiOutEventToCloudEventMapperSpec.groovy
new file mode 100644
index 0000000..8ca629f
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/cmsubscription/CmNotificationSubscriptionDmiOutEventToCloudEventMapperSpec.groovy
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.cmsubscription
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.ncmp.dmi.exception.CloudEventConstructionException
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper])
+class CmNotificationSubscriptionDmiOutEventToCloudEventMapperSpec extends Specification {
+
+    @Autowired
+    def objectMapper = new ObjectMapper()
+
+    @SpringBean
+    CmNotificationSubscriptionDmiOutEventToCloudEventMapper objectUnderTest = new CmNotificationSubscriptionDmiOutEventToCloudEventMapper()
+
+    def 'Convert a Cm Subscription DMI Out Event to CloudEvent successfully.'() {
+        given: 'a Cm Subscription DMI Out Event and an event key'
+            def dmiName = 'test-ncmp-dmi'
+            def correlationId = 'subscription1#test-ncmp-dmi'
+            def cmSubscriptionDmiOutEventData = new Data(statusCode: "1", statusMessage: "accepted")
+            def cmSubscriptionDmiOutEvent =
+                    new CmNotificationSubscriptionDmiOutEvent().withData(cmSubscriptionDmiOutEventData)
+        when: 'a Cm Subscription DMI Out Event is converted'
+            def result = objectUnderTest.toCloudEvent(cmSubscriptionDmiOutEvent, "subscriptionCreatedStatus", dmiName, correlationId)
+        then: 'Cm Subscription DMI Out Event is converted as expected'
+            def expectedCloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                    .withType("subscriptionCreated")
+                    .withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiOutEvent.class.getName() + ":1.0.0"))
+                    .withExtension("correlationid", correlationId)
+                    .withData(objectMapper.writeValueAsBytes(cmSubscriptionDmiOutEvent)).build()
+            assert expectedCloudEvent.data == result.data
+    }
+
+    def 'Map the Cloud Event to data of the subscription event with null parameters causes an exception'() {
+        given: 'an empty subscription response event and event key'
+            def correlationId = 'subscription1#test-ncmp-dmi'
+            def cmSubscriptionDmiOutEvent = new CmNotificationSubscriptionDmiOutEvent()
+        when: 'the cm subscription dmi out Event map to data of cloud event'
+            objectUnderTest.toCloudEvent(cmSubscriptionDmiOutEvent, "subscriptionCreatedStatus", null , correlationId)
+        then: 'a run time exception is thrown'
+            thrown(CloudEventConstructionException)
+    }
+}
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/mapper/CloudEventMapperSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/mapper/CloudEventMapperSpec.groovy
new file mode 100644
index 0000000..0b40477
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/mapper/CloudEventMapperSpec.groovy
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2024 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.mapper
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.client_to_ncmp.CmNotificationSubscriptionNcmpInEvent
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper])
+class CloudEventMapperSpec extends Specification {
+
+    @Autowired
+    ObjectMapper objectMapper
+
+    def 'Cloud event to Target event type when it is #scenario'() {
+        expect: 'Events mapped correctly'
+            assert mappedCloudEvent == (CloudEventMapper.toTargetEvent(testCloudEvent(), targetClass) != null)
+        where: 'below are the scenarios'
+            scenario                | targetClass                                   || mappedCloudEvent
+            'valid concrete type'   | CmNotificationSubscriptionNcmpInEvent.class   || true
+            'invalid concrete type' | ArrayList.class                               || false
+    }
+
+    def testCloudEvent() {
+        return CloudEventBuilder.v1().withData(objectMapper.writeValueAsBytes(new CmNotificationSubscriptionNcmpInEvent()))
+                .withId("cmhandle1")
+                .withSource(URI.create('test-source'))
+                .withDataSchema(URI.create('test'))
+                .withType('org.onap.cm.events.cm-subscription')
+                .build()
+    }
+}
diff --git a/src/test/resources/avcSubscriptionCreationEvent.json b/src/test/resources/avcSubscriptionCreationEvent.json
deleted file mode 100644
index 8fa1004..0000000
--- a/src/test/resources/avcSubscriptionCreationEvent.json
+++ /dev/null
@@ -1,39 +0,0 @@
-{
-  "data": {
-    "dataType": {
-      "dataCategory": "CM",
-      "dataProvider": "CM-SERVICE",
-      "dataspace": "ALL"
-    },
-    "predicates": {
-      "datastore": "passthrough-running",
-      "targets": [
-        {
-          "id": "CMHandle1",
-          "additional-properties": {
-            "prop1": "prop-value"
-          }
-        },
-        {
-          "id": "CMHandle2",
-          "additional-properties": {
-            "prop-x": "prop-valuex",
-            "prop-p": "prop-valuep"
-          }
-        },
-        {
-          "id": "CMHandle3",
-          "additional-properties": {
-            "prop-y": "prop-valuey"
-          }
-        }
-      ],
-      "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
-    },
-    "subscription": {
-      "clientID": "SCO-9989752",
-      "name": "cm-subscription-001",
-      "isTagged": false
-    }
-  }
-}
\ No newline at end of file
diff --git a/src/test/resources/cmNotificationSubscriptionCreationEvent.json b/src/test/resources/cmNotificationSubscriptionCreationEvent.json
new file mode 100644
index 0000000..3b78097
--- /dev/null
+++ b/src/test/resources/cmNotificationSubscriptionCreationEvent.json
@@ -0,0 +1,43 @@
+{
+  "data": {
+    "cmhandles": [
+      {
+        "cmhandleId": "CMHandle1",
+        "private-properties": {
+          "prop1": "prop-value"
+        }
+      },
+      {
+        "cmhandleId": "CMHandle2",
+        "private-properties": {
+          "prop-x": "prop-valuex",
+          "prop-p": "prop-valuep"
+        }
+      },
+      {
+        "cmhandleId": "CMHandle3",
+        "private-properties": {
+          "prop-y": "prop-valuey"
+        }
+      }
+    ],
+    "predicates": [
+      {
+        "targetFilter": [
+          "CMHandle1",
+          "CMHandle2",
+          "CMHandle3"
+        ],
+        "scopeFilter": {
+          "datastore": "ncmp-datastore:passthrough-running",
+          "xpath-filter": [
+            "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/",
+            "//_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction//",
+            "//_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU//",
+            "//_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
+          ]
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file