Async request response NCMP -> Client

-Added consumer for DMI events and producer for forwarding to client
-Added schemas for events
-Updated tests
-Added new module for ncmp events
-Used mapstruct for event mapping

Issue-ID: CPS-830
Change-Id: I096d08af9d69092cf8651e11eaa00ce441fc3605
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 7ad22e9..692996c 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -70,6 +70,19 @@
         producer:

             value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

             client-id: cps-core

+        consumer:

+            group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group}

+            key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

+            value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

+            properties:

+                spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

+                spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

+                spring.json.value.default.type: org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent

+                spring.json.use.type.headers: false

+app:

+    ncmp:

+        async-m2m:

+            topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}

 

 notification:

     data-updated:

@@ -85,7 +98,7 @@
             queue-capacity: 500

             wait-for-tasks-to-complete-on-shutdown: true

             thread-name-prefix: Async-

-

+            time-out-value-in-ms: 2000

 

 springdoc:

     swagger-ui:

diff --git a/cps-bom/pom.xml b/cps-bom/pom.xml
index e468926..f2fcb6e 100644
--- a/cps-bom/pom.xml
+++ b/cps-bom/pom.xml
@@ -2,7 +2,7 @@
 <!--
   ============LICENSE_START=======================================================
   Copyright (C) 2020 Pantheon.tech
-  Modifications Copyright (C) 2021 Nordix Foundation
+  Modifications Copyright (C) 2021 - 2022 Nordix Foundation
   Modifications Copyright (C) 2021 Bell Canada.
   ================================================================================
   Licensed under the Apache License, Version 2.0 (the "License");
@@ -101,6 +101,11 @@
             </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
+                <artifactId>cps-ncmp-events</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
                 <artifactId>checkstyle</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/cps-ncmp-events/pom.xml b/cps-ncmp-events/pom.xml
new file mode 100644
index 0000000..2d49a4c
--- /dev/null
+++ b/cps-ncmp-events/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ============LICENSE_START=======================================================
+  Copyright (c) 2022 Nordix Foundation.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.onap.cps</groupId>
+        <artifactId>cps-parent</artifactId>
+        <version>3.1.0-SNAPSHOT</version>
+        <relativePath>../cps-parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>cps-ncmp-events</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <sourceDirectory>${basedir}/src/main/resources/schemas</sourceDirectory>
+                    <targetPackage>org.onap.cps.ncmp.event.model</targetPackage>
+                    <generateBuilders>true</generateBuilders>
+                    <serializable>true</serializable>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/cps-ncmp-events/src/main/resources/schemas/dmi-async-request-response-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmi-async-request-response-event-schema-v1.json
new file mode 100644
index 0000000..528c063
--- /dev/null
+++ b/cps-ncmp-events/src/main/resources/schemas/dmi-async-request-response-event-schema-v1.json
@@ -0,0 +1,87 @@
+{
+  "$schema": "https://json-schema.org/draft/2019-09/schema",
+  "$id": "urn:cps:org.onap.cps.ncmp.events:dmi-async-request-response-event-schema:v1",
+  "$ref": "#/definitions/DmiAsyncRequestResponseEvent",
+  "definitions": {
+    "DmiAsyncRequestResponseEvent": {
+      "description": "The payload for NCMP async request response event.",
+      "type": "object",
+      "properties": {
+        "eventId": {
+          "description": "The unique id identifying the event generated by DMI.",
+          "type": "string"
+        },
+        "eventCorrelationId": {
+          "description": "The request id passed by NCMP.",
+          "type": "string"
+        },
+        "eventTime": {
+          "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.",
+          "type": "string"
+        },
+        "eventTarget": {
+          "description": "The target of the event.",
+          "type": "string"
+        },
+        "eventType": {
+          "description": "The type of the event.",
+          "type": "string"
+        },
+        "eventSchema": {
+          "description": "The event schema for async request response events.",
+          "type": "string"
+        },
+        "eventSource": {
+          "description": "The source of the event.",
+          "type": "string"
+        },
+        "eventContent": {
+          "$ref": "#/definitions/Event-Content"
+        }
+      },
+      "required": [
+        "eventId",
+        "eventCorrelationId",
+        "eventTime",
+        "eventTarget",
+        "eventType",
+        "eventSchema",
+        "eventSource",
+        "eventContent"
+      ]
+    },
+    "Event-Content": {
+      "description": "The event content.",
+      "type": "object",
+      "properties": {
+        "response-data-schema": {
+          "description": "The schema of response data",
+          "type": "string"
+        },
+        "response-status": {
+          "description": "The status of the response.",
+          "type": "string"
+        },
+        "response-code": {
+          "description": "The code of the response.",
+          "type": "string"
+        },
+        "response-data": {
+          "description": "The data payload",
+          "type": "object",
+          "properties": {
+            "payload": {
+              "type": "object"
+            }
+          }
+        },
+        "required": [
+          "response-data-schema",
+          "response-status",
+          "response-code",
+          "response-data"
+        ]
+      }
+    }
+  }
+}
diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp-async-request-response-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/ncmp-async-request-response-event-schema-v1.json
new file mode 100644
index 0000000..3fd15bd
--- /dev/null
+++ b/cps-ncmp-events/src/main/resources/schemas/ncmp-async-request-response-event-schema-v1.json
@@ -0,0 +1,187 @@
+{
+  "$schema": "https://json-schema.org/draft/2019-09/schema",
+  "$id": "urn:cps:org.onap.cps.ncmp.events:ncmp-async-request-response-event-schema:v1",
+  "$ref": "#/definitions/NcmpAsyncRequestResponseEvent",
+  "definitions": {
+    "NcmpAsyncRequestResponseEvent": {
+      "description": "The payload for CPS async request response event.",
+      "type": "object",
+      "properties": {
+        "eventId": {
+          "description": "The unique id identifying the event generated by DMI.",
+          "type": "string"
+        },
+        "eventCorrelationId": {
+          "description": "The request id passed by NCMP.",
+          "type": "string"
+        },
+        "eventTime": {
+          "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.",
+          "type": "string"
+        },
+        "eventTarget": {
+          "description": "The target of the event.",
+          "type": "string"
+        },
+        "eventType": {
+          "description": "The type of the event.",
+          "type": "string"
+        },
+        "eventSchema": {
+          "description": "The event schema for async request response events.",
+          "type": "string"
+        },
+        "event": {
+        "$ref": "#/definitions/Event"
+        },
+        "forwardedEvent": {
+          "$ref": "#/definitions/Forwarded-Event"
+        }
+      },
+      "required": [
+        "eventId",
+        "eventCorrelationId",
+        "eventTime",
+        "eventTarget",
+        "eventType",
+        "eventSchema"
+      ]
+    },
+    "Forwarded-Event": {
+      "description": "The event content.",
+      "type": "object",
+      "properties": {
+        "eventId": {
+          "description": "The unique id identifying the event generated by DMI.",
+          "type": "string"
+        },
+        "eventCorrelationId": {
+          "description": "The request id passed by NCMP.",
+          "type": "string"
+        },
+        "eventTime": {
+          "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.",
+          "type": "string"
+        },
+        "eventTarget": {
+          "description": "The target of the event.",
+          "type": "string"
+        },
+        "eventType": {
+          "description": "The type of the event.",
+          "type": "string"
+        },
+        "eventSchema": {
+          "description": "The event schema for async request response events.",
+          "type": "string"
+        },
+        "eventSource": {
+          "description": "The source of the event.",
+          "type": "string"
+        },
+        "response-data-schema": {
+          "description": "The received schema of response data",
+          "type": "string"
+        },
+        "response-status": {
+          "description": "The received status of the response.",
+          "type": "string"
+        },
+        "response-code": {
+          "description": "The received code of the response.",
+          "type": "string"
+        },
+        "forwardedEventData": {
+          "description": "The data payload",
+          "type": "object",
+          "properties": {
+            "forwardedEventPayload": {
+              "type": "object"
+            }
+          }
+        },
+        "required": [
+          "eventId",
+          "eventCorrelationId",
+          "eventTime",
+          "eventTarget",
+          "eventType",
+          "eventSchema",
+          "eventSource",
+          "response-data-schema",
+          "response-status",
+          "response-code",
+          "forwardedEventData"
+        ]
+      }
+    },
+    "Event": {
+      "description": "The event content.",
+      "type": "object",
+      "properties": {
+        "eventId": {
+          "description": "The unique id identifying the event generated by DMI",
+          "type": "string"
+        },
+        "eventCorrelationId": {
+          "description": "The request id passed by NCMP.",
+          "type": "string"
+        },
+        "eventTime": {
+          "description": "The time of the event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.",
+          "type": "string"
+        },
+        "eventTarget": {
+          "description": "The target of the event.",
+          "type": "string"
+        },
+        "eventType": {
+          "description": "The type of the event.",
+          "type": "string"
+        },
+        "eventSchema": {
+          "description": "The event schema for async request response events.",
+          "type": "string"
+        },
+        "eventSource": {
+          "description": "The source of the event.",
+          "type": "string"
+        },
+        "response-data-schema": {
+          "description": "The received schema of response data",
+          "type": "string"
+        },
+        "response-status": {
+          "description": "The received status of the response.",
+          "type": "string"
+        },
+        "response-code": {
+          "description": "The received code of the response.",
+          "type": "string"
+        },
+        "response-data": {
+          "description": "The data payload",
+          "type": "object",
+          "properties": {
+            "payload": {
+              "type": "object"
+            }
+          }
+        },
+        "required": [
+          "eventId",
+          "eventCorrelationId",
+          "eventTarget",
+          "eventTime",
+          "eventType",
+          "eventSchema",
+          "eventSource",
+          "response-data-schema",
+          "response-status",
+          "response-code",
+          "event-data"
+        ]
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
index cedc946..11517bc 100755
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
@@ -46,6 +46,7 @@
 import org.onap.cps.ncmp.api.models.CmHandleQueryApiParameters;
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
 import org.onap.cps.ncmp.rest.api.NetworkCmProxyApi;
+import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
 import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper;
 import org.onap.cps.ncmp.rest.model.CmHandleProperties;
 import org.onap.cps.ncmp.rest.model.CmHandleProperty;
@@ -61,6 +62,7 @@
 import org.onap.cps.ncmp.rest.model.RestOutputCmHandlePublicProperties;
 import org.onap.cps.utils.CpsValidator;
 import org.onap.cps.utils.JsonObjectMapper;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -75,12 +77,14 @@
     private static final String NO_BODY = null;
     private static final String NO_REQUEST_ID = null;
     private static final String NO_TOPIC = null;
-    public static final String ASYNC_REQUEST_ID = "requestId";
-
     private final NetworkCmProxyDataService networkCmProxyDataService;
     private final JsonObjectMapper jsonObjectMapper;
     private final NcmpRestInputMapper ncmpRestInputMapper;
     private final RestOutputCmHandleStateMapper restOutputCmHandleStateMapper;
+    private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor;
+
+    @Value("${notification.async.executor.time-out-value-in-ms:2000}")
+    private int timeOutInMilliSeconds;
 
     /**
      * Get resource data from operational datastore.
@@ -96,19 +100,21 @@
                                                                         final @NotNull @Valid String resourceIdentifier,
                                                                         final @Valid String optionsParamInQuery,
                                                                         final @Valid String topicParamInQuery) {
-        final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery);
-        final Map<String, Object> asyncResponseData = asyncResponse.getBody();
-
-        final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(cmHandle,
-                resourceIdentifier,
-                optionsParamInQuery,
-                asyncResponseData == null ? NO_TOPIC : topicParamInQuery,
-                asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString());
-
-        if (asyncResponseData == null) {
-            return ResponseEntity.ok(responseObject);
+        if (isValidTopic(topicParamInQuery)) {
+            final String requestId = UUID.randomUUID().toString();
+            cpsNcmpTaskExecutor.executeTask(() ->
+                networkCmProxyDataService.getResourceDataOperationalForCmHandle(
+                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
+                        requestId
+                ), timeOutInMilliSeconds
+            );
+            return acknowledgeAsyncRequest(requestId);
         }
-        return ResponseEntity.ok(asyncResponse);
+
+        final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(
+            cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID);
+
+        return ResponseEntity.ok(responseObject);
     }
 
     /**
@@ -125,19 +131,21 @@
                                                                     final @NotNull @Valid String resourceIdentifier,
                                                                     final @Valid String optionsParamInQuery,
                                                                     final @Valid String topicParamInQuery) {
-        final ResponseEntity<Map<String, Object>> asyncResponse = populateAsyncResponse(topicParamInQuery);
-        final Map<String, Object> asyncResponseData = asyncResponse.getBody();
-
-        final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(cmHandle,
-                resourceIdentifier,
-                optionsParamInQuery,
-                asyncResponseData == null ? NO_TOPIC : topicParamInQuery,
-                asyncResponseData == null ? NO_REQUEST_ID : asyncResponseData.get(ASYNC_REQUEST_ID).toString());
-
-        if (asyncResponseData == null) {
-            return ResponseEntity.ok(responseObject);
+        if (isValidTopic(topicParamInQuery)) {
+            final String resourceDataRequestId = UUID.randomUUID().toString();
+            cpsNcmpTaskExecutor.executeTask(() ->
+                networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
+                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
+                        resourceDataRequestId
+                ), timeOutInMilliSeconds
+            );
+            return acknowledgeAsyncRequest(resourceDataRequestId);
         }
-        return ResponseEntity.ok(asyncResponse);
+
+        final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
+            cmHandle, resourceIdentifier, optionsParamInQuery, NO_TOPIC, NO_REQUEST_ID);
+
+        return ResponseEntity.ok(responseObject);
     }
 
     @Override
@@ -319,18 +327,7 @@
         return restOutputCmHandle;
     }
 
-    private ResponseEntity<Map<String, Object>> populateAsyncResponse(final String topicParamInQuery) {
-        final boolean processAsynchronously = hasTopicParameter(topicParamInQuery);
-        final Map<String, Object> responseData;
-        if (processAsynchronously) {
-            responseData = getAsyncResponseData();
-        } else {
-            responseData = null;
-        }
-        return ResponseEntity.ok().body(responseData);
-    }
-
-    private static boolean hasTopicParameter(final String topicName) {
+    private static boolean isValidTopic(final String topicName) {
         if (topicName == null) {
             return false;
         }
@@ -340,11 +337,11 @@
         throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic");
     }
 
-    private Map<String, Object> getAsyncResponseData() {
-        final Map<String, Object> asyncResponseData = new HashMap<>(1);
-        final String resourceDataRequestId = UUID.randomUUID().toString();
-        asyncResponseData.put(ASYNC_REQUEST_ID, resourceDataRequestId);
-        return asyncResponseData;
+    private ResponseEntity<Object> acknowledgeAsyncRequest(final String requestId) {
+        final Map<String, Object> acknowledgeData = new HashMap<>(1);
+        acknowledgeData.put("requestId", requestId);
+        return ResponseEntity.ok(acknowledgeData);
     }
 
 }
+
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java
new file mode 100644
index 0000000..3e8929d
--- /dev/null
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/CpsTaskExecutionException.java
@@ -0,0 +1,44 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.rest.exceptions;
+
+import lombok.Getter;
+
+public class CpsTaskExecutionException extends RuntimeException {
+
+    private static final long serialVersionUID = 1481520410918497454L;
+
+    @Getter
+    final String details;
+
+    /**
+     * Constructor.
+     *
+     * @param message the error message
+     * @param details the error details
+     * @param cause   the cause of the exception
+     */
+    public CpsTaskExecutionException(final String message, final String details, final Throwable cause) {
+        super(message, cause);
+        this.details = details;
+    }
+
+}
\ No newline at end of file
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
new file mode 100644
index 0000000..93aa285
--- /dev/null
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/executor/CpsNcmpTaskExecutor.java
@@ -0,0 +1,60 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.rest.executor;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class CpsNcmpTaskExecutor {
+
+    /**
+     * Execute task asynchronously and publish response to supplied topic.
+     *
+     * @param taskSupplier functional method is get() task need to executed asynchronously
+     * @param timeOutInMillis the time out value in milliseconds
+     */
+    public void executeTask(final Supplier<Object> taskSupplier, final int timeOutInMillis) {
+        CompletableFuture.supplyAsync(taskSupplier::get)
+            .orTimeout(timeOutInMillis, MILLISECONDS)
+            .whenCompleteAsync(
+                (responseAsJson, throwable) -> {
+                    handleTaskCompletion(throwable);
+                }
+            );
+    }
+
+    private void handleTaskCompletion(final Throwable throwable) {
+        if (throwable == null) {
+            log.info("Async task completed successfully.");
+        } else {
+            log.error("Async task failed. caused by : {}", throwable.getMessage());
+        }
+    }
+}
+
+
+
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
index 6cf1506..3315304 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy
@@ -28,6 +28,7 @@
 import org.onap.cps.ncmp.api.inventory.CompositeState
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
 import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper
+import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
 import spock.lang.Shared
 
 import java.time.OffsetDateTime
@@ -83,8 +84,8 @@
     @SpringBean
     RestOutputCmHandleStateMapper restOutputCmHandleStateMapper = Mappers.getMapper(RestOutputCmHandleStateMapper)
 
-    def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
-        .format(OffsetDateTime.of(2022, 12, 31, 20, 30, 40, 1, ZoneOffset.UTC))
+    @SpringBean
+    CpsNcmpTaskExecutor spiedCpsTaskExecutor = Spy()
 
     @Value('${rest.api.ncmp-base-path}/v1')
     def ncmpBasePathV1
@@ -95,6 +96,9 @@
     def NO_TOPIC = null
     def NO_REQUEST_ID = null
 
+    def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")
+        .format(OffsetDateTime.of(2022, 12, 31, 20, 30, 40, 1, ZoneOffset.UTC))
+
     def 'Get Resource Data from pass-through operational.'() {
         given: 'resource data url'
             def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:passthrough-operational" +
@@ -120,34 +124,40 @@
                     "?resourceIdentifier=parent/child&options=(a=1,b=2)${topicQueryParam}"
         when: 'get data resource request is performed'
             def response = mvc.perform(
-                    get(getUrl)
-                    .contentType(MediaType.APPLICATION_JSON)
-            ).andReturn().response
-        then: 'the NCMP data service is called with operational data for cm handle'
-            expectedNumberOfMethodExecutions
-                    * mockNetworkCmProxyDataService."${expectedMethodName}"('testCmHandle',
-                    'parent/child',
-                    '(a=1,b=2)',
-                    expectedTopicName,
-                    _)
-        then: 'response status is expected'
-            response.status == expectedHttpStatus
+                    get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response
+        then: 'task executor is called appropriate number of times'
+            expectedNumberOfExecutorExecutions * spiedCpsTaskExecutor.executeTask(_, 2000)
+        and: 'response status is expected'
+            response.status == HttpStatus.OK.value()
         where: 'the following parameters are used'
-            scenario                               | datastoreInUrl            | topicQueryParam        || expectedTopicName | expectedMethodName                             | expectedNumberOfMethodExecutions | expectedHttpStatus
-            'url with valid topic'                 | 'passthrough-operational' | '&topic=my-topic-name' || 'my-topic-name'   | 'getResourceDataOperationalForCmHandle'        | 1                                | HttpStatus.OK.value()
-            'no topic in url'                      | 'passthrough-operational' | ''                     || NO_TOPIC          | 'getResourceDataOperationalForCmHandle'        | 1                                | HttpStatus.OK.value()
-            'null topic in url'                    | 'passthrough-operational' | '&topic=null'          || 'null'            | 'getResourceDataOperationalForCmHandle'        | 1                                | HttpStatus.OK.value()
-            'empty topic in url'                   | 'passthrough-operational' | '&topic=\"\"'          || null              | 'getResourceDataOperationalForCmHandle'        | 0                                | HttpStatus.BAD_REQUEST.value()
-            'missing topic in url'                 | 'passthrough-operational' | '&topic='              || null              | 'getResourceDataOperationalForCmHandle'        | 0                                | HttpStatus.BAD_REQUEST.value()
-            'blank topic value in url'             | 'passthrough-operational' | '&topic=\" \"'         || null              | 'getResourceDataOperationalForCmHandle'        | 0                                | HttpStatus.BAD_REQUEST.value()
-            'invalid non-empty topic value in url' | 'passthrough-operational' | '&topic=1_5_*_#'       || null              | 'getResourceDataOperationalForCmHandle'        | 0                                | HttpStatus.BAD_REQUEST.value()
-            'url with valid topic'                 | 'passthrough-running'     | '&topic=my-topic-name' || 'my-topic-name'   | 'getResourceDataPassThroughRunningForCmHandle' | 1                                | HttpStatus.OK.value()
-            'no topic in url'                      | 'passthrough-running'     | ''                     || NO_TOPIC          | 'getResourceDataPassThroughRunningForCmHandle' | 1                                | HttpStatus.OK.value()
-            'null topic in url'                    | 'passthrough-running'     | '&topic=null'          || 'null'            | 'getResourceDataPassThroughRunningForCmHandle' | 1                                | HttpStatus.OK.value()
-            'empty topic in url'                   | 'passthrough-running'     | '&topic=\"\"'          || null              | 'getResourceDataPassThroughRunningForCmHandle' | 0                                | HttpStatus.BAD_REQUEST.value()
-            'missing topic in url'                 | 'passthrough-running'     | '&topic='              || null              | 'getResourceDataPassThroughRunningForCmHandle' | 0                                | HttpStatus.BAD_REQUEST.value()
-            'blank topic value in url'             | 'passthrough-running'     | '&topic=\" \"'         || null              | 'getResourceDataPassThroughRunningForCmHandle' | 0                                | HttpStatus.BAD_REQUEST.value()
-            'invalid non-empty topic value in url' | 'passthrough-running'     | '&topic=1_5_*_#'       || null              | 'getResourceDataPassThroughRunningForCmHandle' | 0                                | HttpStatus.BAD_REQUEST.value()
+            scenario                               | datastoreInUrl            | topicQueryParam        || expectedTopicName | expectedNumberOfExecutorExecutions
+            'url with valid topic'                 | 'passthrough-operational' | '&topic=my-topic-name' || 'my-topic-name'   | 1
+            'no topic in url'                      | 'passthrough-operational' | ''                     || NO_TOPIC          | 0
+            'null topic in url'                    | 'passthrough-operational' | '&topic=null'          || 'null'            | 1
+            'url with valid topic'                 | 'passthrough-running'     | '&topic=my-topic-name' || 'my-topic-name'   | 1
+            'no topic in url'                      | 'passthrough-running'     | ''                     || NO_TOPIC          | 0
+            'null topic in url'                    | 'passthrough-running'     | '&topic=null'          || 'null'            | 1
+    }
+
+    def 'Fail to get Resource Data from #datastoreInUrl when #scenario.'() {
+        given: 'resource data url'
+            def getUrl = "$ncmpBasePathV1/ch/testCmHandle/data/ds/ncmp-datastore:${datastoreInUrl}" +
+                "?resourceIdentifier=parent/child&options=(a=1,b=2)${topicQueryParam}"
+        when: 'get data resource request is performed'
+            def response = mvc.perform(
+                get(getUrl).contentType(MediaType.APPLICATION_JSON)).andReturn().response
+        then: 'abad request is returned'
+            response.status == HttpStatus.BAD_REQUEST.value()
+        where: 'the following parameters are used'
+            scenario                               | datastoreInUrl            | topicQueryParam
+            'empty topic in url'                   | 'passthrough-operational' | '&topic=\"\"'
+            'missing topic in url'                 | 'passthrough-operational' | '&topic='
+            'blank topic value in url'             | 'passthrough-operational' | '&topic=\" \"'
+            'invalid non-empty topic value in url' | 'passthrough-operational' | '&topic=1_5_*_#'
+            'empty topic in url'                   | 'passthrough-running'     | '&topic=\"\"'
+            'missing topic in url'                 | 'passthrough-running'     | '&topic='
+            'blank topic value in url'             | 'passthrough-running'     | '&topic=\" \"'
+            'invalid non-empty topic value in url' | 'passthrough-running'     | '&topic=1_5_*_#'
     }
 
     def 'Get Resource Data from pass-through running with #scenario value in resource identifier param.'() {
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
index 1258e6e..fd3203b 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
@@ -30,6 +30,7 @@
 import org.onap.cps.ncmp.api.impl.exception.ServerNcmpException
 import org.onap.cps.ncmp.rest.controller.NcmpRestInputMapper
 import org.onap.cps.ncmp.rest.mapper.RestOutputCmHandleStateMapper
+import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
 import org.onap.cps.spi.exceptions.CpsException
 import org.onap.cps.spi.exceptions.DataNodeNotFoundException
 import org.onap.cps.spi.exceptions.DataValidationException
@@ -62,7 +63,7 @@
     NetworkCmProxyDataService mockNetworkCmProxyDataService = Mock()
 
     @SpringBean
-    JsonObjectMapper jsonObjectMapper = Stub()
+    JsonObjectMapper stubbedJsonObjectMapper = Stub()
 
     @SpringBean
     NcmpRestInputMapper ncmpRestInputMapper = Mappers.getMapper(NcmpRestInputMapper)
@@ -70,6 +71,9 @@
     @SpringBean
     RestOutputCmHandleStateMapper restOutputCmHandleStateMapper = Mappers.getMapper(RestOutputCmHandleStateMapper)
 
+    @SpringBean
+    CpsNcmpTaskExecutor stubbedCpsTaskExecutor = Stub()
+
     @Value('${rest.api.ncmp-base-path}')
     def basePathNcmp
 
diff --git a/cps-ncmp-rest/src/test/resources/application.yml b/cps-ncmp-rest/src/test/resources/application.yml
index f2ca8c7..0241696 100644
--- a/cps-ncmp-rest/src/test/resources/application.yml
+++ b/cps-ncmp-rest/src/test/resources/application.yml
@@ -21,3 +21,8 @@
     api:
         ncmp-base-path: /ncmp
         ncmp-inventory-base-path: /ncmpInventory
+
+notification:
+    async:
+        executor:
+            time-out-value-in-ms: 2000
\ No newline at end of file
diff --git a/cps-ncmp-service/pom.xml b/cps-ncmp-service/pom.xml
index 573c76e..45112de 100644
--- a/cps-ncmp-service/pom.xml
+++ b/cps-ncmp-service/pom.xml
@@ -38,6 +38,23 @@
             <artifactId>cps-service</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.onap.cps</groupId>
+            <artifactId>cps-ncmp-events</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mapstruct</groupId>
+            <artifactId>mapstruct</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mapstruct</groupId>
+            <artifactId>mapstruct-processor</artifactId>
+        </dependency>
+        <!-- T E S T - D E P E N D E N C I E S -->
+        <dependency>
             <groupId>org.spockframework</groupId>
             <artifactId>spock-core</artifactId>
             <scope>test</scope>
@@ -48,6 +65,16 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>spock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-test</artifactId>
             <scope>test</scope>
@@ -58,13 +85,5 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-web</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-validation</artifactId>
-        </dependency>
     </dependencies>
 </project>
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java
new file mode 100644
index 0000000..4e5c57b
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.async;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * Listener for cps-ncmp async request response events.
+ */
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class NcmpAsyncRequestResponseEventConsumer {
+
+    private final NcmpAsyncRequestResponseEventProducer ncmpAsyncRequestResponseEventProducer;
+    private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper;
+
+    /**
+     * Consume the specified event.
+     *
+     * @param dmiAsyncRequestResponseEvent the event to be consumed and produced.
+     */
+    @KafkaListener(topics = "${app.ncmp.async-m2m.topic}")
+    public void consumeAndForward(final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) {
+        log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent);
+
+        final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent =
+                ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent);
+        ncmpAsyncRequestResponseEventProducer.sendMessage(
+                ncmpAsyncRequestResponseEvent.getEventId(), ncmpAsyncRequestResponseEvent);
+    }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java
new file mode 100644
index 0000000..5d8ac7f
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.async;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.UUID;
+import org.mapstruct.AfterMapping;
+import org.mapstruct.Mapper;
+import org.mapstruct.Mapping;
+import org.mapstruct.MappingTarget;
+import org.mapstruct.Named;
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
+import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
+
+/**
+ * Mapper for converting DmiAsyncRequestResponseEvent to NcmpAsyncRequestResponseEvent.
+ */
+@Mapper(componentModel = "spring")
+public interface NcmpAsyncRequestResponseEventMapper {
+
+    @Mapping(source = "eventId", target = "eventId", qualifiedByName = "ncmpAsyncEventId")
+    @Mapping(source = "eventTime", target = "eventTime", qualifiedByName = "currentTime")
+    @Mapping(source = "eventId", target = "forwardedEvent.eventId")
+    @Mapping(source = "eventCorrelationId", target = "forwardedEvent.eventCorrelationId")
+    @Mapping(source = "eventSchema", target = "forwardedEvent.eventSchema")
+    @Mapping(source = "eventSource", target = "forwardedEvent.eventSource")
+    @Mapping(source = "eventTarget", target = "forwardedEvent.eventTarget")
+    @Mapping(source = "eventTime", target = "forwardedEvent.eventTime")
+    @Mapping(source = "eventType", target = "forwardedEvent.eventType")
+    @Mapping(source = "eventContent.responseStatus", target = "forwardedEvent.responseStatus")
+    @Mapping(source = "eventContent.responseCode", target = "forwardedEvent.responseCode")
+    @Mapping(source = "eventContent.responseDataSchema", target = "forwardedEvent.responseDataSchema")
+    NcmpAsyncRequestResponseEvent toNcmpAsyncEvent(DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent);
+
+    @Named("ncmpAsyncEventId")
+    static String getNcmpAsyncEventId(String eventId) {
+        return UUID.randomUUID().toString();
+    }
+
+    @Named("currentTime")
+    static String getFormattedCurrentTime(String eventTime) {
+        return ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+    }
+
+    @AfterMapping
+    default void mapAdditionalProperties(DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent,
+                                         @MappingTarget NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent) {
+        ncmpAsyncRequestResponseEvent.getForwardedEvent().setAdditionalProperty("response-data",
+                dmiAsyncRequestResponseEvent.getEventContent().getResponseData().getAdditionalProperties());
+    }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java
new file mode 100644
index 0000000..8ab6db9
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.async;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class NcmpAsyncRequestResponseEventProducer {
+
+    private final KafkaTemplate<String, NcmpAsyncRequestResponseEvent> kafkaTemplate;
+
+
+    /**
+     * Sends message to the configured topic with a message key.
+     *
+     * @param eventId message key
+     * @param ncmpAsyncRequestResponseEvent    message payload
+     */
+    public void sendMessage(final String eventId, final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent) {
+        kafkaTemplate.send(ncmpAsyncRequestResponseEvent.getEventTarget(), eventId, ncmpAsyncRequestResponseEvent);
+    }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
new file mode 100644
index 0000000..aa6bf1a
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
@@ -0,0 +1,126 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2022 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.async
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.mapstruct.factory.Mappers
+import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
+import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonSerializer
+import org.testcontainers.containers.KafkaContainer
+import org.testcontainers.spock.Testcontainers
+import org.testcontainers.utility.DockerImageName
+
+import java.time.Duration
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.onap.cps.utils.JsonObjectMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.onap.cps.ncmp.utils.TestUtils;
+import org.springframework.boot.test.context.SpringBootTest
+import org.spockframework.spring.SpringBean
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import spock.lang.Specification
+
+@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer])
+@Testcontainers
+@DirtiesContext
+class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification {
+
+    static kafkaTestContainer = new KafkaContainer(
+        DockerImageName.parse('confluentinc/cp-kafka:6.2.1')
+    )
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
+    }
+
+    def setupSpec() {
+        kafkaTestContainer.start()
+    }
+
+    def producerConfigProperties = [
+        (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)      : kafkaTestContainer.getBootstrapServers().split(',')[0],
+        (ProducerConfig.RETRIES_CONFIG)                : 0,
+        (ProducerConfig.BATCH_SIZE_CONFIG)             : 16384,
+        (ProducerConfig.LINGER_MS_CONFIG)              : 1,
+        (ProducerConfig.BUFFER_MEMORY_CONFIG)          : 33554432,
+        (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)   : StringSerializer,
+        (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) : JsonSerializer
+    ]
+
+    def consumerConfigProperties = [
+        (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)       : kafkaTestContainer.getBootstrapServers().split(',')[0],
+        (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)  : StringDeserializer,
+        (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer,
+        (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)       : 'earliest',
+        (ConsumerConfig.GROUP_ID_CONFIG)                : 'test'
+    ]
+
+    def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties))
+
+    @SpringBean
+    NcmpAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducerService =
+        new NcmpAsyncRequestResponseEventProducer(kafkaTemplate);
+
+    @SpringBean
+    NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper =
+            Mappers.getMapper(NcmpAsyncRequestResponseEventMapper.class)
+
+    @SpringBean
+    NcmpAsyncRequestResponseEventConsumer ncmpAsyncRequestResponseEventConsumer =
+            new NcmpAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducerService,
+                    ncmpAsyncRequestResponseEventMapper)
+
+    def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+
+    def kafkaConsumer = new KafkaConsumer<>(getConsumerConfigProperties())
+
+    def 'Consume and forward valid message'() {
+        given: 'consumer has a subscription'
+            kafkaConsumer.subscribe(['test-topic'] as List<String>)
+        and: 'an event is sent'
+            def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json')
+            def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class)
+        when: 'the event is consumed'
+            ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent)
+        and: 'the topic is polled'
+            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+        then: 'poll returns one record'
+            assert records.size() == 1
+        and: 'consumed forwarded event id is the same as sent event id'
+            def record = records.iterator().next()
+            assert testEventSent.eventId.equalsIgnoreCase(jsonObjectMapper.convertJsonString(record.value(),
+                    NcmpAsyncRequestResponseEvent).getForwardedEvent().getEventId())
+    }
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+    }
+
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy
index 4c8dcac..964826b 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy
@@ -45,14 +45,14 @@
             def uriVars = objectUnderTest.populateUriVariables(yangModelCmHandle,
                     "cmHandle", PASSTHROUGH_RUNNING)
         and: 'query params'
-            def uriQueries = objectUnderTest.populateQueryParams(resourceId,
-                    'optionsParamInQuery', topicParamInQuery)
+                            def uriQueries = objectUnderTest.populateQueryParams(resourceId,
+                    'optionsParamInQuery', topic)
         when: 'a dmi datastore service url is generated'
             def dmiServiceUrl = objectUnderTest.getDmiDatastoreUrl(uriQueries, uriVars)
         then: 'service url is generated as expected'
             assert dmiServiceUrl == expectedDmiServiceUrl
         where: 'the following parameters are used'
-            scenario                       | topicParamInQuery   | resourceId   || expectedDmiServiceUrl
+            scenario                       | topic               | resourceId   || expectedDmiServiceUrl
             'With valid resourceId'        | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery'
             'With Empty resourceId'        | 'topicParamInQuery' | ''           || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?options=optionsParamInQuery&topic=topicParamInQuery'
             'With Empty dmi base path'     | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery'
diff --git a/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json b/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json
new file mode 100644
index 0000000..bf6c86a
--- /dev/null
+++ b/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json
@@ -0,0 +1,30 @@
+{
+  "eventId": "8dbfe0a7-3b28-4109-8fcb-9fbc9c37d56a",
+  "eventCorrelationId": "122ca20b-4f8c-4759-a2b4-f0b9456df204",
+  "eventTime": "2022-05-09T13:34:50.466+0000",
+  "eventSource": "org.onap.ncmp",
+  "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
+  "eventTarget": "test-topic",
+  "eventContent": {
+    "response-data-schema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1",
+    "response-status": "SUCCESS",
+    "response-code": "200",
+    "response-data": {
+      "ietf-netconf-monitoring:netconf-state": {
+        "schemas": {
+          "schema": [
+            {
+              "identifier": "ietf-tls-server",
+              "version": "2016-11-02",
+              "format": "ietf-netconf-monitoring:yang",
+              "namespace": "urn:ietf:params:xml:ns:yang:ietf-tls-server",
+              "location": [
+                "NETCONF"
+              ]
+            }
+          ]
+        }
+      }
+    }
+  }
+}
diff --git a/cps-service/pom.xml b/cps-service/pom.xml
index b9d6268..1be45d1 100644
--- a/cps-service/pom.xml
+++ b/cps-service/pom.xml
@@ -1,165 +1,165 @@
-<?xml version="1.0" encoding="UTF-8"?>

-<!--

-  ============LICENSE_START=======================================================

-  Copyright (C) 2021-2022 Nordix Foundation

-  Modifications Copyright (C) 2021 Bell Canada.

-  Modifications Copyright (C) 2021 Pantheon.tech

-  ================================================================================

-  Licensed under the Apache License, Version 2.0 (the "License");

-  you may not use this file except in compliance with the License.

-  You may obtain a copy of the License at

-

-        http://www.apache.org/licenses/LICENSE-2.0

-

-  Unless required by applicable law or agreed to in writing, software

-  distributed under the License is distributed on an "AS IS" BASIS,

-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

-  See the License for the specific language governing permissions and

-  limitations under the License.

-

-  SPDX-License-Identifier: Apache-2.0

-  ============LICENSE_END=========================================================

--->

-

-<project xmlns="http://maven.apache.org/POM/4.0.0"

-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

-  <modelVersion>4.0.0</modelVersion>

-  <parent>

-    <groupId>org.onap.cps</groupId>

-    <artifactId>cps-parent</artifactId>

-    <version>3.1.0-SNAPSHOT</version>

-    <relativePath>../cps-parent/pom.xml</relativePath>

-  </parent>

-

-  <artifactId>cps-service</artifactId>

-

-  <properties>

-    <minimum-coverage>0.94</minimum-coverage>

-  </properties>

-

-  <dependencies>

-    <dependency>

-      <groupId>org.onap.cps</groupId>

-      <artifactId>cps-events</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.opendaylight.yangtools</groupId>

-      <artifactId>yang-model-api</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.opendaylight.yangtools</groupId>

-      <artifactId>yang-parser-api</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.opendaylight.yangtools</groupId>

-      <artifactId>yang-parser-impl</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.opendaylight.yangtools</groupId>

-      <artifactId>yang-model-util</artifactId>

-    </dependency>

-    <!-- required for processing yang data in json format -->

-    <dependency>

-      <groupId>org.opendaylight.yangtools</groupId>

-      <artifactId>yang-data-codec-gson</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.projectlombok</groupId>

-      <artifactId>lombok</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.springframework.boot</groupId>

-      <artifactId>spring-boot-starter-cache</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>com.github.ben-manes.caffeine</groupId>

-      <artifactId>caffeine</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.springframework.kafka</groupId>

-      <artifactId>spring-kafka</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.springframework</groupId>

-      <artifactId>spring-messaging</artifactId>

-    </dependency>

-    <dependency>

-      <!-- For logging -->

-      <groupId>org.slf4j</groupId>

-      <artifactId>slf4j-api</artifactId>

-    </dependency>

-    <dependency>

-      <!-- For dependency injection -->

-      <groupId>org.springframework</groupId>

-      <artifactId>spring-context</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.springframework.boot</groupId>

-      <artifactId>spring-boot-starter-validation</artifactId>

-    </dependency>

-    <dependency>

-      <!-- For parsing JSON object -->

-      <groupId>com.google.code.gson</groupId>

-      <artifactId>gson</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.springframework.boot</groupId>

-      <artifactId>spring-boot-starter-aop</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>net.logstash.logback</groupId>

-      <artifactId>logstash-logback-encoder</artifactId>

-    </dependency>

-    <dependency>

-      <groupId>org.codehaus.janino</groupId>

-      <artifactId>janino</artifactId>

-    </dependency>

-    <!-- T E S T   D E P E N D E N C I E S -->

-    <dependency>

-      <groupId>org.codehaus.groovy</groupId>

-      <artifactId>groovy</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>org.codehaus.groovy</groupId>

-      <artifactId>groovy-json</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>org.spockframework</groupId>

-      <artifactId>spock-core</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>org.spockframework</groupId>

-      <artifactId>spock-spring</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>org.springframework.boot</groupId>

-      <artifactId>spring-boot-starter-test</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>cglib</groupId>

-      <artifactId>cglib-nodep</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>org.testcontainers</groupId>

-      <artifactId>kafka</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>org.springframework.kafka</groupId>

-      <artifactId>spring-kafka-test</artifactId>

-      <scope>test</scope>

-    </dependency>

-    <dependency>

-      <groupId>org.aspectj</groupId>

-      <artifactId>aspectjrt</artifactId>

-      <scope>test</scope>

-    </dependency>

-  </dependencies>

-</project>

+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ============LICENSE_START=======================================================
+  Copyright (C) 2021-2022 Nordix Foundation
+  Modifications Copyright (C) 2021 Bell Canada.
+  Modifications Copyright (C) 2021 Pantheon.tech
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+  SPDX-License-Identifier: Apache-2.0
+  ============LICENSE_END=========================================================
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.onap.cps</groupId>
+    <artifactId>cps-parent</artifactId>
+    <version>3.1.0-SNAPSHOT</version>
+    <relativePath>../cps-parent/pom.xml</relativePath>
+  </parent>
+
+  <artifactId>cps-service</artifactId>
+
+  <properties>
+    <minimum-coverage>0.94</minimum-coverage>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.onap.cps</groupId>
+      <artifactId>cps-events</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-model-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-parser-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-parser-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-model-util</artifactId>
+    </dependency>
+    <!-- required for processing yang data in json format -->
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-codec-gson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-cache</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-messaging</artifactId>
+    </dependency>
+    <dependency>
+      <!-- For logging -->
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <!-- For dependency injection -->
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-context</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-validation</artifactId>
+    </dependency>
+    <dependency>
+      <!-- For parsing JSON object -->
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-aop</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>net.logstash.logback</groupId>
+      <artifactId>logstash-logback-encoder</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.janino</groupId>
+      <artifactId>janino</artifactId>
+    </dependency>
+    <!-- T E S T   D E P E N D E N C I E S -->
+    <dependency>
+      <groupId>org.codehaus.groovy</groupId>
+      <artifactId>groovy</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.groovy</groupId>
+      <artifactId>groovy-json</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.spockframework</groupId>
+      <artifactId>spock-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.spockframework</groupId>
+      <artifactId>spock-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>cglib</groupId>
+      <artifactId>cglib-nodep</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>kafka</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.aspectj</groupId>
+      <artifactId>aspectjrt</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
index 2f1067a..0772a8c 100755
--- a/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
+++ b/cps-service/src/main/java/org/onap/cps/api/impl/CpsDataServiceImpl.java
@@ -22,6 +22,10 @@
 
 package org.onap.cps.api.impl;
 
+import static org.onap.cps.notification.Operation.CREATE;
+import static org.onap.cps.notification.Operation.DELETE;
+import static org.onap.cps.notification.Operation.UPDATE;
+
 import java.time.OffsetDateTime;
 import java.util.Collection;
 import lombok.AllArgsConstructor;
@@ -61,7 +65,7 @@
         CpsValidator.validateNameCharacters(dataspaceName, anchorName);
         final DataNode dataNode = buildDataNode(dataspaceName, anchorName, ROOT_NODE_XPATH, jsonData);
         cpsDataPersistenceService.storeDataNode(dataspaceName, anchorName, dataNode);
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, ROOT_NODE_XPATH, Operation.CREATE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, ROOT_NODE_XPATH, CREATE);
     }
 
     @Override
@@ -70,7 +74,7 @@
         CpsValidator.validateNameCharacters(dataspaceName, anchorName);
         final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData);
         cpsDataPersistenceService.addChildDataNode(dataspaceName, anchorName, parentNodeXpath, dataNode);
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.CREATE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, CREATE);
     }
 
     @Override
@@ -81,7 +85,7 @@
             buildDataNodes(dataspaceName, anchorName, parentNodeXpath, jsonData);
         cpsDataPersistenceService.addListElements(dataspaceName, anchorName, parentNodeXpath,
             listElementDataNodeCollection);
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE);
     }
 
     @Override
@@ -98,7 +102,7 @@
         final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData);
         cpsDataPersistenceService
             .updateDataLeaves(dataspaceName, anchorName, dataNode.getXpath(), dataNode.getLeaves());
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE);
     }
 
     @Override
@@ -113,7 +117,7 @@
         for (final DataNode dataNodeUpdate : dataNodeUpdates) {
             processDataNodeUpdate(dataspaceName, anchorName, dataNodeUpdate);
         }
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE);
     }
 
     @Override
@@ -143,7 +147,7 @@
         CpsValidator.validateNameCharacters(dataspaceName, anchorName);
         final DataNode dataNode = buildDataNode(dataspaceName, anchorName, parentNodeXpath, jsonData);
         cpsDataPersistenceService.replaceDataNodeTree(dataspaceName, anchorName, dataNode);
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE);
     }
 
     @Override
@@ -160,7 +164,7 @@
             final Collection<DataNode> dataNodes, final OffsetDateTime observedTimestamp) {
         CpsValidator.validateNameCharacters(dataspaceName, anchorName);
         cpsDataPersistenceService.replaceListContent(dataspaceName, anchorName, parentNodeXpath, dataNodes);
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, Operation.UPDATE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, parentNodeXpath, UPDATE);
     }
 
     @Override
@@ -168,7 +172,7 @@
                                final OffsetDateTime observedTimestamp) {
         CpsValidator.validateNameCharacters(dataspaceName, anchorName);
         cpsDataPersistenceService.deleteDataNode(dataspaceName, anchorName, dataNodeXpath);
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, dataNodeXpath, Operation.DELETE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, dataNodeXpath, DELETE);
     }
 
     @Override
@@ -177,7 +181,7 @@
         CpsValidator.validateNameCharacters(dataspaceName, anchorName);
         final Anchor anchor = cpsAdminService.getAnchor(dataspaceName, anchorName);
         cpsDataPersistenceService.deleteDataNodes(dataspaceName, anchorName);
-        processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, Operation.DELETE, observedTimestamp);
+        processDataUpdatedEventAsync(anchor, ROOT_NODE_XPATH, DELETE, observedTimestamp);
     }
 
     @Override
@@ -185,7 +189,7 @@
         final OffsetDateTime observedTimestamp) {
         CpsValidator.validateNameCharacters(dataspaceName, anchorName);
         cpsDataPersistenceService.deleteListDataNode(dataspaceName, anchorName, listNodeXpath);
-        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, listNodeXpath, Operation.DELETE);
+        processDataUpdatedEventAsync(dataspaceName, anchorName, observedTimestamp, listNodeXpath, DELETE);
     }
 
     private DataNode buildDataNode(final String dataspaceName, final String anchorName,
@@ -233,10 +237,13 @@
         this.processDataUpdatedEventAsync(anchor, xpath, operation, observedTimestamp);
     }
 
-    private void processDataUpdatedEventAsync(final Anchor anchor, final String xpath, final Operation operation,
+    private void processDataUpdatedEventAsync(final Anchor anchor,
+                                              final String xpath,
+                                              final Operation operation,
         final OffsetDateTime observedTimestamp) {
         try {
-            notificationService.processDataUpdatedEvent(anchor, observedTimestamp, xpath, operation);
+            notificationService.processDataUpdatedEvent(anchor, observedTimestamp, xpath,
+                operation);
         } catch (final Exception exception) {
             //If async message can't be queued for notification service, the initial request should not failed.
             log.error("Failed to send message to notification service", exception);
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy
index 5124a51..05b9624 100644
--- a/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy
+++ b/cps-service/src/test/groovy/org/onap/cps/notification/KafkaTestContainerConfig.groovy
@@ -33,7 +33,7 @@
     // Not the best performance but it is good enough for test case
     private static synchronized KafkaContainer getKafkaContainer() {
         if (kafkaContainer == null) {
-            kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.1"))
+            kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
                     .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
             kafkaContainer.start()
             Runtime.getRuntime().addShutdownHook(new Thread(kafkaContainer::stop))
diff --git a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
index 6ef6874..8263c31 100644
--- a/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
+++ b/cps-service/src/test/groovy/org/onap/cps/notification/NotificationServiceSpec.groovy
@@ -29,7 +29,6 @@
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.scheduling.annotation.EnableAsync
 import org.springframework.test.context.ContextConfiguration
 import spock.lang.Shared
 import spock.lang.Specification
@@ -107,18 +106,18 @@
             1 * mockNotificationPublisher.sendNotification(cpsDataUpdatedEvent)
         where:
             scenario                                   | xpath           | operation            || expectedOperationInEvent
-            'Same event is sent when root nodes'       | ''              | Operation.CREATE     || Operation.CREATE
-            'Same event is sent when root nodes'       | ''              | Operation.UPDATE     || Operation.UPDATE
-            'Same event is sent when root nodes'       | ''              | Operation.DELETE     || Operation.DELETE
-            'Same event is sent when root nodes'       | '/'             | Operation.CREATE     || Operation.CREATE
-            'Same event is sent when root nodes'       | '/'             | Operation.UPDATE     || Operation.UPDATE
-            'Same event is sent when root nodes'       | '/'             | Operation.DELETE     || Operation.DELETE
-            'Same event is sent when container nodes'  | '/parent'       | Operation.CREATE     || Operation.CREATE
-            'Same event is sent when container nodes'  | '/parent'       | Operation.UPDATE     || Operation.UPDATE
-            'Same event is sent when container nodes'  | '/parent'       | Operation.DELETE     || Operation.DELETE
-            'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.CREATE     || Operation.UPDATE
-            'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.UPDATE     || Operation.UPDATE
-            'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.DELETE     || Operation.UPDATE
+            'Same event is sent when root nodes'       | ''              | Operation.CREATE || Operation.CREATE
+            'Same event is sent when root nodes'       | ''              | Operation.UPDATE || Operation.UPDATE
+            'Same event is sent when root nodes'       | ''              | Operation.DELETE || Operation.DELETE
+            'Same event is sent when root nodes'       | '/'             | Operation.CREATE || Operation.CREATE
+            'Same event is sent when root nodes'       | '/'             | Operation.UPDATE || Operation.UPDATE
+            'Same event is sent when root nodes'       | '/'             | Operation.DELETE || Operation.DELETE
+            'Same event is sent when container nodes'  | '/parent'       | Operation.CREATE || Operation.CREATE
+            'Same event is sent when container nodes'  | '/parent'       | Operation.UPDATE || Operation.UPDATE
+            'Same event is sent when container nodes'  | '/parent'       | Operation.DELETE || Operation.DELETE
+            'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.CREATE || Operation.UPDATE
+            'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.UPDATE || Operation.UPDATE
+            'UPDATE event is sent when non root nodes' | '/parent/child' | Operation.DELETE || Operation.UPDATE
     }
 
     def 'Error handling in notification service.'() {
diff --git a/cps-service/src/test/resources/application.yml b/cps-service/src/test/resources/application.yml
index 436c3d4..a28b400 100644
--- a/cps-service/src/test/resources/application.yml
+++ b/cps-service/src/test/resources/application.yml
@@ -1,5 +1,6 @@
 #  ============LICENSE_START=======================================================
 #  Copyright (c) 2021 Bell Canada.
+#  Modification Copyright (C) 2022 Nordix Foundation.
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
diff --git a/csit/plans/cps/setup.sh b/csit/plans/cps/setup.sh
index d633b1e..5954240 100755
--- a/csit/plans/cps/setup.sh
+++ b/csit/plans/cps/setup.sh
@@ -1,6 +1,7 @@
 #!/bin/bash
 #
 # Copyright 2016-2017 Huawei Technologies Co., Ltd.
+# Modifications Copyright (C) 2022 Nordix Foundation.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -64,20 +65,7 @@
 curl -L https://github.com/docker/compose/releases/download/1.25.0/docker-compose-`uname -s`-`uname -m` > docker-compose
 chmod +x docker-compose
 
-# start CPS and PostgreSQL containers with docker compose
-./docker-compose up -d
-
-###################### setup onap-dmi-plugin ############################
-
-cd $WORKSPACE/archives
-git clone "https://gerrit.onap.org/r/cps/ncmp-dmi-plugin"
-mkdir -p $WORKSPACE/archives/dc-dmi
-cat $WORKSPACE/archives/ncmp-dmi-plugin/docker-compose/docker-compose.yml
-cp $WORKSPACE/archives/ncmp-dmi-plugin/docker-compose/*.yml $WORKSPACE/archives/dc-dmi
-cd $WORKSPACE/archives/dc-dmi
-# copy docker-compose (downloaded already for cps)
-cp $WORKSPACE/archives/dc-cps/docker-compose .
-chmod +x docker-compose
+# start CPS/NCMP, DMI, and PostgreSQL containers with docker compose
 ./docker-compose up -d
 
 ###################### setup sdnc #######################################
diff --git a/csit/tests/ncmp-passthrough/ncmp-passthrough.robot b/csit/tests/ncmp-passthrough/ncmp-passthrough.robot
index 32d9604..95a8d53 100644
--- a/csit/tests/ncmp-passthrough/ncmp-passthrough.robot
+++ b/csit/tests/ncmp-passthrough/ncmp-passthrough.robot
@@ -36,6 +36,13 @@
 
 *** Test Cases ***
 
+Get for Passthrough Operational (CF, RO) with fields & topic
+    ${uri}=              Set Variable       ${ncmpBasePath}/v1/ch/PNFDemo/data/ds/ncmp-datastore:passthrough-operational?resourceIdentifier=ietf-netconf-monitoring:netconf-state&options=(fields=schemas/schema)&topic=test-topic
+    ${headers}=          Create Dictionary  Authorization=${auth}
+    ${response}=         Get On Session     CPS_URL   ${uri}   headers=${headers}   expected_status=200
+    ${responseJson}=     Set Variable       ${response.json()}
+    Should Be Equal As Strings              ${response.status_code}   200
+
 Get for Passthrough Operational (CF, RO) with fields
     ${uri}=              Set Variable       ${ncmpBasePath}/v1/ch/PNFDemo/data/ds/ncmp-datastore:passthrough-operational?resourceIdentifier=ietf-netconf-monitoring:netconf-state&options=(fields=schemas/schema)
     ${headers}=          Create Dictionary  Authorization=${auth}
@@ -136,4 +143,4 @@
     ${verifyUri}=       Set Variable       ${ncmpBasePath}/v1/ch/PNFDemo/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=stores:bookstore/categories=02/books=A%20New%20book%20in%20existing%20category
     ${verifyResponse}=  Get On Session     CPS_URL   ${verifyUri}   headers=${verifyHeaders}
     ${responseJson}=    Set Variable       ${verifyResponse.json()}
-    Should Be Equal As Strings             ${verifyResponse.status_code}   200
\ No newline at end of file
+    Should Be Equal As Strings             ${verifyResponse.status_code}   200
diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml
index 44ebd3b..f2f477f 100755
--- a/docker-compose/docker-compose.yml
+++ b/docker-compose/docker-compose.yml
@@ -1,7 +1,7 @@
 # ============LICENSE_START=======================================================
 # Copyright (c) 2020 Pantheon.tech.
 # Modifications Copyright (C) 2021 Bell Canada.
-# Modifications Copyright (C) 2021 Nordix Foundation
+# Modifications Copyright (C) 2021-2022 Nordix Foundation.
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -64,7 +64,7 @@
   #    - dbpostgresql
 
   #  zookeeper:
-  #    image: confluentinc/cp-zookeeper:6.1.1
+  #    image: confluentinc/cp-zookeeper:6.2.1
   #    environment:
   #      ZOOKEEPER_CLIENT_PORT: 2181
   #      ZOOKEEPER_TICK_TIME: 2000
@@ -72,7 +72,7 @@
   #      - 22181:2181
   #
   #  kafka:
-  #    image: confluentinc/cp-kafka:6.1.1
+  #    image: confluentinc/cp-kafka:6.2.1
   #    depends_on:
   #      - zookeeper
   #    ports:
@@ -109,9 +109,58 @@
       DB_PASSWORD: ${DB_PASSWORD:-cps}
       DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
       DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
-      #KAFKA_BOOTSTRAP_SERVER: kafka:9092
-      #notification.data-updated.enabled: 'true'
-      #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
+      KAFKA_BOOTSTRAP_SERVER: kafka:9092
+      notification.data-updated.enabled: 'true'
+      NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
     restart: unless-stopped
     depends_on:
-      - dbpostgresql
\ No newline at end of file
+      - dbpostgresql
+
+  ### if kafka is not required comment out zookeeper and kafka ###
+  zookeeper:
+    image: confluentinc/cp-zookeeper:6.2.1
+    container_name: zookeeper
+    ports:
+      - '2181:2181'
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+
+  kafka:
+    image: confluentinc/cp-kafka:6.2.1
+    container_name: kafka
+    ports:
+      - "19092:19092"
+    depends_on:
+      - zookeeper
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+
+  ### Comment out this section if dmi plugin is not required ###
+  ncmp-dmi-plugin:
+    container_name: ncmp-dmi-plugin
+    image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/ncmp-dmi-plugin:${DMI_VERSION:-1.2.0-SNAPSHOT-latest}
+    ports:
+      - ${DMI_PORT:-8783}:8080
+      - ${DMI_MANAGEMENT_PORT:-8787}:8081
+    environment:
+      CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser}
+      CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!}
+      CPS_CORE_HOST: ${CPS_CORE_HOST:-cps-and-ncmp}
+      CPS_CORE_PORT: ${CPS_CORE_PORT:-8080}
+      CPS_CORE_USERNAME: ${CPS_CORE_USERNAME:-cpsuser}
+      CPS_CORE_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!}
+      SDNC_HOST: ${SDNC_HOST:-sdnc}
+      SDNC_PORT: ${SDNC_PORT:-8181}
+      SDNC_USERNAME: ${SDNC_USERNAME:-admin}
+      SDNC_PASSWORD: ${SDNC_PASSWORD:-Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U}
+      DMI_SERVICE_URL: ${DMI_SERVICE_URL:-http://ncmp-dmi-plugin:8783}
+      DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
+      DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
+      KAFKA_BOOTSTRAP_SERVER: kafka:9092
+      notification.data-updated.enabled: 'true'
+      NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
+    restart: unless-stopped
diff --git a/jacoco-report/pom.xml b/jacoco-report/pom.xml
index d1181d3..b8f18e7 100644
--- a/jacoco-report/pom.xml
+++ b/jacoco-report/pom.xml
@@ -69,6 +69,7 @@
                         <exclude>org/onap/cps/ncmp/rest/model/*</exclude>
                         <exclude>org/onap/cps/ncmp/rest/controller/*MapperImpl.class</exclude>
                         <exclude>org/onap/cps/rest/controller/*MapperImpl.class</exclude>
+                        <exclude>org/onap/cps/ncmp/api/impl/async/*MapperImpl.class</exclude>
                     </excludes>
                 </configuration>
                 <executions>
diff --git a/pom.xml b/pom.xml
index 23ef44b..12d8a1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
         <module>cps-events</module>

         <module>cps-service</module>

         <module>cps-rest</module>

+        <module>cps-ncmp-events</module>

         <module>cps-ncmp-service</module>

         <module>cps-ncmp-rest</module>

         <module>cps-path-parser</module>