Merge "Cm Subscription: Remove subscription method"
diff --git a/cps-ncmp-rest/docs/openapi/components.yaml b/cps-ncmp-rest/docs/openapi/components.yaml
index 0ad453a..8aa38c0 100644
--- a/cps-ncmp-rest/docs/openapi/components.yaml
+++ b/cps-ncmp-rest/docs/openapi/components.yaml
@@ -366,6 +366,7 @@
           type: array
           items:
             type: string
+            description: targeted cm handles, maximum of 50 supported. If this limit is exceeded the request wil be refused.
             example: [ "da310eecdb8d44c2acc0ddaae01174b1","c748c58f8e0b438f9fd1f28370b17d47" ]
 
   examples:
@@ -695,7 +696,7 @@
           schema:
             $ref: '#/components/schemas/ErrorMessage'
           example:
-            status: 400 BAD_REQUEST
+            status: 400
             message: Bad request error message
             details: Bad request error details
     Conflict:
@@ -705,9 +706,19 @@
           schema:
             $ref: '#/components/schemas/ErrorMessage'
           example:
-            status: 409 CONFLICT
+            status: 409
             message: Conflict error message
             details: Conflict error details
+    PayloadTooLarge:
+      description: The request is larger than the server is willing or able to process
+      content:
+        application/json:
+          schema:
+            $ref: '#/components/schemas/ErrorMessage'
+          example:
+            status: 413
+            message: Payload Too Large error message
+            details: Payload Too Large error details
     NotImplemented:
       description: The given path has not been implemented
       content:
diff --git a/cps-ncmp-rest/docs/openapi/ncmp.yml b/cps-ncmp-rest/docs/openapi/ncmp.yml
index 0cb1cdf..d0b1f35 100755
--- a/cps-ncmp-rest/docs/openapi/ncmp.yml
+++ b/cps-ncmp-rest/docs/openapi/ncmp.yml
@@ -194,7 +194,7 @@
     tags:
       - network-cm-proxy
     summary: Execute a data operation for group of cm handle ids
-    description: This request will be handled asynchronously using messaging to the supplied topic. The rest response will be an acknowledge with a requestId to identify the relevant messages.
+    description: This request will be handled asynchronously using messaging to the supplied topic. The rest response will be an acknowledge with a requestId to identify the relevant messages. A maximum of 50 cm handles per operation is supported.
     operationId: executeDataOperationForCmHandles
     parameters:
       - $ref: 'components.yaml#/components/parameters/requiredTopicParamInQuery'
@@ -216,6 +216,8 @@
         $ref: 'components.yaml#/components/responses/BadRequest'
       403:
         $ref: 'components.yaml#/components/responses/Forbidden'
+      413:
+        $ref: 'components.yaml#/components/responses/PayloadTooLarge'
       500:
         $ref: 'components.yaml#/components/responses/InternalServerError'
       502:
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
index 75112ca..eca7ebf 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
@@ -33,6 +33,7 @@
 import org.onap.cps.ncmp.api.models.CmResourceAddress;
 import org.onap.cps.ncmp.api.models.DataOperationRequest;
 import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
+import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException;
 import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
 import org.onap.cps.ncmp.rest.util.TopicValidator;
 import org.springframework.http.ResponseEntity;
@@ -45,6 +46,10 @@
 
     private static final Object noReturn = null;
 
+    private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 50;
+
+    private static final String PAYLOAD_TOO_LARGE_TEMPLATE = "Operation '%s' affects too many (%d) cm handles";
+
     /**
      * Constructor.
      *
@@ -101,17 +106,23 @@
     }
 
     private void validateDataOperationRequest(final String topicParamInQuery,
-                                              final DataOperationRequest
-                                                  dataOperationRequest) {
+                                              final DataOperationRequest dataOperationRequest) {
         TopicValidator.validateTopicName(topicParamInQuery);
         dataOperationRequest.getDataOperationDefinitions().forEach(dataOperationDetail -> {
             if (OperationType.fromOperationName(dataOperationDetail.getOperation()) != READ) {
                 throw new OperationNotSupportedException(
                     dataOperationDetail.getOperation() + " operation not yet supported");
-            } else if (DatastoreType.fromDatastoreName(dataOperationDetail.getDatastore()) == OPERATIONAL) {
+            }
+            if (DatastoreType.fromDatastoreName(dataOperationDetail.getDatastore()) == OPERATIONAL) {
                 throw new InvalidDatastoreException(dataOperationDetail.getDatastore()
                     + " datastore is not supported");
             }
+            if (dataOperationDetail.getCmHandleIds().size() > MAXIMUM_CM_HANDLES_PER_OPERATION) {
+                final String errorMessage = String.format(PAYLOAD_TOO_LARGE_TEMPLATE,
+                    dataOperationDetail.getOperationId(),
+                    dataOperationDetail.getCmHandleIds().size());
+                throw new PayloadTooLargeException(errorMessage);
+            }
         });
     }
 
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java
index 7498c5f..d323691 100755
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java
@@ -1,7 +1,7 @@
 /*
  *  ============LICENSE_START=======================================================
  *  Copyright (C) 2021 Pantheon.tech
- *  Modifications Copyright (C) 2021-2023 Nordix Foundation
+ *  Modifications Copyright (C) 2021-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -60,8 +60,7 @@
      * @return response with response code 500.
      */
     @ExceptionHandler
-    public static ResponseEntity<Object> handleInternalServerErrorExceptions(
-            final Exception exception) {
+    public static ResponseEntity<Object> handleInternalServerErrorExceptions(final Exception exception) {
         return buildErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR, exception);
     }
 
@@ -73,7 +72,7 @@
     @ExceptionHandler({HttpClientRequestException.class})
     public static ResponseEntity<Object> handleClientRequestExceptions(
             final HttpClientRequestException httpClientRequestException) {
-        return wrapDmiErrorResponse(HttpStatus.BAD_GATEWAY, httpClientRequestException);
+        return wrapDmiErrorResponse(httpClientRequestException);
     }
 
     @ExceptionHandler({DmiRequestException.class, DataValidationException.class, OperationNotSupportedException.class,
@@ -88,10 +87,15 @@
     }
 
     @ExceptionHandler({DataNodeNotFoundException.class})
-    public static ResponseEntity<Object> handleNotFoundExceptions(final CpsException exception) {
+    public static ResponseEntity<Object> handleNotFoundExceptions(final Exception exception) {
         return buildErrorResponse(HttpStatus.NOT_FOUND, exception);
     }
 
+    @ExceptionHandler({PayloadTooLargeException.class})
+    public static ResponseEntity<Object> handlePayloadTooLargeExceptions(final Exception exception) {
+        return buildErrorResponse(HttpStatus.PAYLOAD_TOO_LARGE, exception);
+    }
+
     private static ResponseEntity<Object> buildErrorResponse(final HttpStatus status, final Exception exception) {
         if (exception.getCause() != null || !(exception instanceof CpsException)) {
             log.error("Exception occurred", exception);
@@ -111,15 +115,14 @@
         return new ResponseEntity<>(errorMessage, status);
     }
 
-    private static ResponseEntity<Object> wrapDmiErrorResponse(
-            final HttpStatus httpStatus,
-            final HttpClientRequestException httpClientRequestException) {
+    private static ResponseEntity<Object> wrapDmiErrorResponse(final HttpClientRequestException
+                                                                     httpClientRequestException) {
         final var dmiErrorMessage = new DmiErrorMessage();
         final var dmiErrorResponse = new DmiErrorMessageDmiResponse();
         dmiErrorResponse.setHttpCode(httpClientRequestException.getHttpStatus());
         dmiErrorResponse.setBody(httpClientRequestException.getDetails());
         dmiErrorMessage.setMessage(httpClientRequestException.getMessage());
         dmiErrorMessage.setDmiResponse(dmiErrorResponse);
-        return new ResponseEntity<>(dmiErrorMessage, httpStatus);
+        return new ResponseEntity<>(dmiErrorMessage, HttpStatus.BAD_GATEWAY);
     }
 }
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/PayloadTooLargeException.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/PayloadTooLargeException.java
new file mode 100644
index 0000000..cddbd08
--- /dev/null
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/PayloadTooLargeException.java
@@ -0,0 +1,31 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.rest.exceptions;
+
+public class PayloadTooLargeException extends RuntimeException {
+
+    /**
+     * Instantiates a new payload too large exception.
+     */
+    public PayloadTooLargeException(final String message) {
+        super(message);
+    }
+}
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
index bdd0e71..aef37c9 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
@@ -27,6 +27,7 @@
 import org.onap.cps.ncmp.api.models.DataOperationRequest
 import org.onap.cps.ncmp.api.models.CmResourceAddress
 import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException
+import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException
 import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
 import spock.lang.Specification
 import spock.util.concurrent.PollingConditions
@@ -110,9 +111,7 @@
     }
 
     def 'Attempt to execute async data operation request with error #scenario'() {
-        given: 'notification feature is turned on'
-            objectUnderTest.notificationFeatureEnabled = true
-        and: 'a data operation definition with datastore: #datastore'
+        given: 'a data operation definition with datastore: #datastore'
             def dataOperationDefinition = new DataOperationDefinition(operation: 'read', datastore: datastore)
         when: 'data operation request is executed'
             def dataOperationRequest = new DataOperationRequest(dataOperationDefinitions: [dataOperationDefinition])
@@ -127,11 +126,9 @@
     }
 
     def 'Attempt to execute async data operation request with #scenario operation: #operation.'() {
-        given: 'notification feature is turned on'
-            objectUnderTest.notificationFeatureEnabled = true
-        and: 'a data operation definition with operation: #operation'
+        given: 'a data operation definition with operation: #operation'
             def dataOperationDefinition = new DataOperationDefinition(operation: operation, datastore: 'ncmp-datastore:passthrough-running')
-        when: 'bulk request is executed'
+        when: 'data operation request is executed'
             objectUnderTest.executeRequest('someTopic', new DataOperationRequest(dataOperationDefinitions:[dataOperationDefinition]), NO_AUTH_HEADER)
         then: 'the expected type of exception is thrown'
             thrown(expectedException)
@@ -144,4 +141,16 @@
             'unsupported' | 'delete'  || OperationNotSupportedException
     }
 
+    def 'Attempt to execute async data operation request with too many cm handles.'() {
+        given: 'a data operation definition with too many cm handles'
+            def cmHandleIds = new String[51]
+            def dataOperationDefinition = new DataOperationDefinition(operationId: 'abc', operation: 'read', datastore: 'ncmp-datastore:passthrough-running', cmHandleIds: cmHandleIds)
+        when: 'data operation request is executed'
+            objectUnderTest.executeRequest('someTopic', new DataOperationRequest(dataOperationDefinitions:[dataOperationDefinition]), NO_AUTH_HEADER)
+        then: 'a payload too large exception is thrown'
+            def exceptionThrown = thrown(PayloadTooLargeException)
+        and: 'the error message contains the offending number of cm handles'
+            assert exceptionThrown.message == "Operation 'abc' affects too many (51) cm handles"
+    }
+
 }
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
index dd02b31..a79ea25 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
@@ -1,7 +1,7 @@
 /*
  *  ============LICENSE_START=======================================================
  *  Copyright (C) 2021 highstreet technologies GmbH
- *  Modifications Copyright (C) 2021-2023 Nordix Foundation
+ *  Modifications Copyright (C) 2021-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -23,9 +23,10 @@
 
 import static org.springframework.http.HttpStatus.BAD_GATEWAY
 import static org.springframework.http.HttpStatus.BAD_REQUEST
+import static org.springframework.http.HttpStatus.CONFLICT
 import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR
 import static org.springframework.http.HttpStatus.NOT_FOUND
-import static org.springframework.http.HttpStatus.CONFLICT
+import static org.springframework.http.HttpStatus.PAYLOAD_TOO_LARGE
 import static org.onap.cps.ncmp.rest.exceptions.NetworkCmProxyRestExceptionHandlerSpec.ApiType.NCMP
 import static org.onap.cps.ncmp.rest.exceptions.NetworkCmProxyRestExceptionHandlerSpec.ApiType.NCMPINVENTORY
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get
@@ -111,22 +112,23 @@
         dataNodeBaseEndpointNcmpInventory = "$basePathNcmpInventory/v1"
     }
 
-    def 'Get request with generic #scenario exception returns correct HTTP Status with #scenario'() {
+    def 'Get request with #scenario exception returns correct HTTP Status with #scenario'() {
         when: 'an exception is thrown by the service'
             setupTestException(exception, NCMP)
             def response = performTestRequest(NCMP)
         then: 'an HTTP response is returned with correct message and details'
             assertTestResponse(response, expectedErrorCode, expectedErrorMessage, expectedErrorDetails)
         where:
-            scenario              | exception                                                        || expectedErrorDetails           | expectedErrorMessage        | expectedErrorCode
-            'CPS'                 | new CpsException(sampleErrorMessage, sampleErrorDetails)         || sampleErrorDetails             | sampleErrorMessage          | INTERNAL_SERVER_ERROR
-            'NCMP-server'         | new ServerNcmpException(sampleErrorMessage, sampleErrorDetails)  || null                           | sampleErrorMessage          | INTERNAL_SERVER_ERROR
-            'NCMP-client'         | new DmiRequestException(sampleErrorMessage, sampleErrorDetails)  || null                           | sampleErrorMessage          | BAD_REQUEST
-            'DataNode Validation' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || null                           | 'DataNode not found'        | NOT_FOUND
-            'other'               | new IllegalStateException(sampleErrorMessage)                    || null                           | sampleErrorMessage          | INTERNAL_SERVER_ERROR
-            'Data Node Not Found' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || 'DataNode not found'           | 'DataNode not found'        | NOT_FOUND
-            'Existing entry'      | new AlreadyDefinedException('name',null)                         || 'name already exists'          | 'Already defined exception' | CONFLICT
-            'Existing entries'    | AlreadyDefinedException.forDataNodes(['A', 'B'], 'myAnchorName') || '2 data node(s) already exist' | 'Already defined exception' | CONFLICT
+            scenario              | exception                                                        || expectedErrorCode     | expectedErrorMessage        | expectedErrorDetails
+            'CPS'                 | new CpsException(sampleErrorMessage, sampleErrorDetails)         || INTERNAL_SERVER_ERROR | sampleErrorMessage          | sampleErrorDetails
+            'NCMP-server'         | new ServerNcmpException(sampleErrorMessage, sampleErrorDetails)  || INTERNAL_SERVER_ERROR | sampleErrorMessage          | null
+            'NCMP-client'         | new DmiRequestException(sampleErrorMessage, sampleErrorDetails)  || BAD_REQUEST           | sampleErrorMessage          | null
+            'DataNode Validation' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || NOT_FOUND             | 'DataNode not found'        | null
+            'other'               | new IllegalStateException(sampleErrorMessage)                    || INTERNAL_SERVER_ERROR | sampleErrorMessage          | null
+            'Data Node Not Found' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || NOT_FOUND             | 'DataNode not found'        | 'DataNode not found'
+            'Existing entry'      | new AlreadyDefinedException('name',null)                         || CONFLICT              | 'Already defined exception' | 'name already exists'
+            'Existing entries'    | AlreadyDefinedException.forDataNodes(['A', 'B'], 'myAnchorName') || CONFLICT              | 'Already defined exception' | '2 data node(s) already exist'
+            'Operation too large' | new PayloadTooLargeException(sampleErrorMessage)                 || PAYLOAD_TOO_LARGE     | sampleErrorMessage          | 'Check logs'
     }
 
     def 'Post request with exception returns correct HTTP Status.'() {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java
new file mode 100644
index 0000000..6122afc
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java
@@ -0,0 +1,46 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api;
+
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata;
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest;
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest;
+
+public interface DataJobService {
+
+    /**
+     * process read data job operations.
+     *
+     * @param dataJobId          Unique identifier of the job within the request
+     * @param dataJobMetadata    data job request headers
+     * @param dataJobReadRequest read data job request
+     */
+    void readDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobReadRequest dataJobReadRequest);
+
+    /**
+     * process write data job operations.
+     *
+     * @param dataJobId           Unique identifier of the job within the request
+     * @param dataJobMetadata     data job request headers
+     * @param dataJobWriteRequest write data job request
+     */
+    void writeDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobWriteRequest dataJobWriteRequest);
+}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
index 462679e..bdc3dee 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
@@ -37,7 +37,8 @@
     UNKNOWN_ERROR("108", "Unknown error"),
     CM_HANDLE_ALREADY_EXIST("109", "cm-handle already exists"),
     CM_HANDLE_INVALID_ID("110", "cm-handle has an invalid character(s) in id"),
-    ALTERNATE_ID_ALREADY_ASSOCIATED("111", "alternate id already associated");
+    ALTERNATE_ID_ALREADY_ASSOCIATED("111", "alternate id already associated"),
+    MESSAGE_TOO_LARGE("112", "message too large");
 
     private final String code;
     private final String message;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java
new file mode 100644
index 0000000..b4377b8
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java
@@ -0,0 +1,43 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.DataJobService;
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata;
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest;
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest;
+
+@Slf4j
+public class DataJobServiceImpl implements DataJobService {
+
+    @Override
+    public void readDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+                            final DataJobReadRequest dataJobReadRequest) {
+        log.info("data job id for read operation is: {}", dataJobId);
+    }
+
+    @Override
+    public void writeDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+                             final DataJobWriteRequest dataJobWriteRequest) {
+        log.info("data job id for write operation is: {}", dataJobId);
+    }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
index 4f2674a..167df5a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.ssl.SslBundles;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
@@ -51,6 +52,8 @@
 
     private final KafkaProperties kafkaProperties;
 
+    private static final SslBundles NO_SSL = null;
+
     /**
      * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
      * application.yml and replaces value-serializer by JsonSerializer.
@@ -59,7 +62,7 @@
      */
     @Bean
     public ProducerFactory<String, T> legacyEventProducerFactory() {
-        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
         producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
     }
@@ -72,7 +75,7 @@
      */
     @Bean
     public ConsumerFactory<String, T> legacyEventConsumerFactory() {
-        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
         consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
     }
@@ -112,7 +115,7 @@
      */
     @Bean
     public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
-        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
     }
 
@@ -124,7 +127,7 @@
      */
     @Bean
     public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
-        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
     }
 
@@ -136,7 +139,8 @@
      */
     @Bean
     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
-        final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+        final KafkaTemplate<String, CloudEvent> kafkaTemplate =
+            new KafkaTemplate<>(cloudEventProducerFactory());
         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
         return kafkaTemplate;
     }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
index ea72fd2..82ae546 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
@@ -23,16 +23,21 @@
 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
 
 import io.cloudevents.CloudEvent;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
 @Component
 @Slf4j
+@RequiredArgsConstructor
 public class CmNotificationSubscriptionDmiOutEventConsumer {
 
+    private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
+
     /**
      * Consume the Cm Notification Subscription event from the dmi-plugin.
      *
@@ -56,7 +61,23 @@
             final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent) {
         final String subscriptionId = correlationId.split("#")[0];
         final String dmiPluginName = correlationId.split("#")[1];
+
+        if ("ACCEPTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+            handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
+            dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+        }
+
+        if ("REJECTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+            handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
+        }
+
         log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
                 dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
     }
+
+    private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
+                                         final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
+        dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
+            dmiPluginName, cmNotificationSubscriptionStatus);
+    }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
index 2f10b1c..8c1cac3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
@@ -30,8 +30,10 @@
 import java.util.Map;
 import java.util.Set;
 import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
 import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
 import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
 import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence;
 import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
@@ -42,6 +44,7 @@
 @RequiredArgsConstructor
 public class DmiCmNotificationSubscriptionCacheHandler {
 
+    private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
     private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
     private final InventoryPersistence inventoryPersistence;
 
@@ -83,6 +86,46 @@
         return dmiCmNotificationSubscriptionDetailsPerDmi;
     }
 
+    /**
+     *  Update status in map of subscription details per DMI.
+     *
+     * @param subscriptionId    String of subscription Id
+     * @param dmiServiceName    String of dmiServiceName
+     * @param status            String of status
+     *
+     */
+    public void updateDmiCmNotificationSubscriptionStatusPerDmi(
+        final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) {
+        cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+            .setCmNotificationSubscriptionStatus(status);
+    }
+
+    /**
+     *  Persist map of subscription details per DMI.
+     *
+     * @param subscriptionId    String of subscription Id
+     * @param dmiServiceName    String of dmiServiceName
+     *
+     */
+    public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
+        final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+            cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+            .getDmiCmNotificationSubscriptionPredicates();
+        for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
+            dmiCmNotificationSubscriptionPredicateList) {
+            final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
+            final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
+            final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+
+            for (final String cmHandle: cmHandles) {
+                for (final String xpath: xpaths) {
+                    cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,
+                        cmHandle, xpath, subscriptionId);
+                }
+            }
+        }
+    }
+
     private void updateDmiCmNotificationSubscriptionDetailsPerDmi(
             final String dmiServiceName,
             final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java
new file mode 100644
index 0000000..dc8037b
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java
@@ -0,0 +1,33 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+/**
+ * Metadata of read/write data job request.
+ *
+ * @param destination     The destination of the data job results.
+ * @param dataAcceptType  Define the data response accept type.
+ *                        e.g. "application/vnd.3gpp.object-tree-hierarchical+json",
+ *                        "application/vnd.3gpp.object-tree-flat+json" etc.
+ * @param dataContentType Define the data request content type.
+ *                        e.g. "application/3gpp-json-patch+json" etc.
+ */
+public record DataJobMetadata(String destination, String dataAcceptType, String dataContentType) {}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java
new file mode 100644
index 0000000..f861c3d
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java
@@ -0,0 +1,30 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Describes the read data job operation to be forwarded to dmi.
+ *
+ * @param data List of read operations to be executed.
+ */
+public record DataJobReadRequest(List<ReadOperation> data) {}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java
new file mode 100644
index 0000000..254e198
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java
@@ -0,0 +1,30 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Describes the write data job operation to be forwarded to dmi.
+ *
+ * @param data List of write operations to be executed.
+ */
+public record DataJobWriteRequest(List<WriteOperation> data) {}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java
new file mode 100644
index 0000000..d2b0738
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java
@@ -0,0 +1,43 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Holds information of read data job operation.
+ * based on <a href="https://www.etsi.org/deliver/etsi_ts/128500_128599/128532/16.04.00_60/ts_128532v160400p.pdf">ETSI TS 128 532 V16.4.0 (2020-08)</a>
+ *
+ * @param path        Identifier of a managed object (MO) on a network element. Defines the resource on which operation
+ *                    is executed. Url Encoded Fully Distinguished Name (FDN).
+ * @param op          Describes the operation to execute. The value can only be "read".
+ * @param operationId Unique identifier of the operation within the request.
+ * @param attributes  Specifies the attributes of the resources that are returned.
+ * @param fields      Specifies the attribute fields of the resources that are returned. This should be used if an
+ *                    attribute is a struct and only a subset of its fields should be returned.
+ * @param filter      This filters the managed Objects.
+ * @param scopeType   This selects MOs depending on relationships with Base Managed Object.
+ *                    e.g. "BASE_ONLY", "BASE_ALL", "BASE_NTH_LEVEL" etc.
+ * @param scopeLevel  Defines the level for objects to be returned for certain scopeTypes. The base level is zero.
+ */
+public record ReadOperation(String path, String op, String operationId, List<String> attributes, List<String> fields,
+                            String filter, String scopeType, int scopeLevel) {
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java
new file mode 100644
index 0000000..c2f6504
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java
@@ -0,0 +1,34 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+/**
+ * Holds information of write data job operation.
+ * based on <a href="https://www.etsi.org/deliver/etsi_ts/128500_128599/128532/16.04.00_60/ts_128532v160400p.pdf">ETSI TS 128 532 V16.4.0 (2020-08)</a>
+ *
+ * @param path        Identifier of a managed object (MO) on a network element. Defines the resource on which operation
+ *                    is executed. Typically, is Fully Distinguished Name (FDN).
+ * @param op          Describes the operation to execute.  The value can be as below:
+ *                    e.g. "add", "replace", "remove", "action" etc.
+ * @param operationId Unique identifier of the operation within the request.
+ * @param value       The value to be written depends on the type of operation.
+ */
+public record WriteOperation(String path, String op, String operationId, Object value) {}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy
new file mode 100644
index 0000000..4378764
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy
@@ -0,0 +1,79 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import org.slf4j.LoggerFactory
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata
+import org.onap.cps.ncmp.api.models.datajob.ReadOperation
+import org.onap.cps.ncmp.api.models.datajob.WriteOperation
+import spock.lang.Specification
+
+class DataJobServiceImplSpec extends Specification{
+
+    def objectUnderTest = new DataJobServiceImpl()
+
+    def logger = Spy(ListAppender<ILoggingEvent>)
+
+    def setup() {
+        setupLogger()
+    }
+
+    def cleanup() {
+        ((Logger) LoggerFactory.getLogger(DataJobServiceImpl.class)).detachAndStopAllAppenders()
+    }
+
+    def '#operation data job request.'() {
+        given: 'data job metadata'
+            def dataJobMetadata = new DataJobMetadata('client-topic', 'application/vnd.3gpp.object-tree-hierarchical+json', 'application/3gpp-json-patch+json')
+        when: 'read/write data job request is processed'
+            if (operation == 'read') {
+                objectUnderTest.readDataJob('some-job-id', dataJobMetadata, new DataJobReadRequest([getWriteOrReadOperationRequest(operation)]))
+            } else {
+                objectUnderTest.writeDataJob('some-job-id', dataJobMetadata, new DataJobWriteRequest([getWriteOrReadOperationRequest(operation)]))
+            }
+        then: 'the data job id is correctly logged'
+            def loggingEvent = logger.list[0]
+            assert loggingEvent.level == Level.INFO
+            assert loggingEvent.formattedMessage.contains('data job id for ' + operation + ' operation is: some-job-id')
+        where: 'the following data job operations are used'
+            operation << ['read', 'write']
+    }
+
+    def getWriteOrReadOperationRequest(operation) {
+        if (operation == 'write') {
+            return new WriteOperation('some/write/path', 'add', 'some-operation-id', 'some-value')
+        }
+        return new ReadOperation('some/read/path', 'read', 'some-operation-id', ['some-attrib-1'], ['some-field-1'], 'some-filter', 'some-scope-type', 1)
+    }
+
+    def setupLogger() {
+        def setupLogger = ((Logger) LoggerFactory.getLogger(DataJobServiceImpl.class))
+        setupLogger.setLevel(Level.DEBUG)
+        setupLogger.addAppender(logger)
+        logger.start()
+    }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
index d47be6c..4d0af6f 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
@@ -23,9 +23,6 @@
 
 package org.onap.cps.ncmp.api.impl
 
-import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse
-import org.onap.cps.ncmp.api.models.CmResourceAddress
-
 import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME
 import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME
 import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
@@ -35,6 +32,8 @@
 import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
 import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
 
+import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse
+import org.onap.cps.ncmp.api.models.CmResourceAddress
 import org.onap.cps.ncmp.api.impl.utils.AlternateIdChecker
 import com.hazelcast.map.IMap
 import org.onap.cps.ncmp.api.NetworkCmProxyCmHandleQueryService
@@ -57,7 +56,6 @@
 import org.onap.cps.ncmp.api.models.DataOperationRequest
 import org.onap.cps.spi.exceptions.CpsException
 import org.onap.cps.spi.model.ConditionProperties
-
 import java.util.stream.Collectors
 import org.onap.cps.utils.JsonObjectMapper
 import com.fasterxml.jackson.databind.ObjectMapper
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
index d5b0915..16f27d0 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
+ *  Copyright (C) 2023-2024 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
index 4f0132e..523ec76 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
@@ -28,26 +28,32 @@
 import io.cloudevents.CloudEvent
 import io.cloudevents.core.builder.CloudEventBuilder
 import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
 import org.slf4j.LoggerFactory
+import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 
 @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
 class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpec {
 
-    def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer()
-    def logger = Spy(ListAppender<ILoggingEvent>)
-
     @Autowired
     JsonObjectMapper jsonObjectMapper
 
     @Autowired
     ObjectMapper objectMapper
 
+    @SpringBean
+    DmiCmNotificationSubscriptionCacheHandler mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler)
+
+    def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer(mockDmiCmNotificationSubscriptionCacheHandler)
+    def logger = Spy(ListAppender<ILoggingEvent>)
+
     void setup() {
         ((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionDmiOutEventConsumer.class)).addAppender(logger)
         logger.start()
@@ -78,6 +84,29 @@
             assert loggingEvent.formattedMessage == 'Cm Subscription with id : sub-1 handled by the dmi-plugin : test-dmi-plugin-name has the status : accepted'
     }
 
+    def 'Consume a valid CM Notification Subscription Event and perform correct actions base on status'() {
+        given: 'a cmNotificationSubscription event'
+            def dmiOutEventData = new Data(statusCode: statusCode, statusMessage: subscriptionStatus.toString())
+            def cmNotificationSubscriptionDmiOutEvent = new CmNotificationSubscriptionDmiOutEvent().withData(dmiOutEventData)
+            def testCloudEventSent = CloudEventBuilder.v1()
+                .withData(objectMapper.writeValueAsBytes(cmNotificationSubscriptionDmiOutEvent))
+                .withId('random-uuid')
+                .withType('subscriptionCreateResponse')
+                .withSource(URI.create('test-dmi-plugin-name'))
+                .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+        when: 'the event is consumed'
+            objectUnderTest.consumeCmNotificationSubscriptionDmiOutEvent(consumerRecord)
+        then: 'correct number of calls to cache'
+            expectedCacheCalls * mockDmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
+        and: 'correct number of calls to persist cache'
+            expectedPersistenceCalls * mockDmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
+        where: 'the following parameters are used'
+            scenario            | subscriptionStatus                            | statusCode || expectedCacheCalls | expectedPersistenceCalls
+            'Accepted Status'   | CmNotificationSubscriptionStatus.ACCEPTED     | '1'        || 1                  | 1
+            'Rejected Status'   | CmNotificationSubscriptionStatus.REJECTED     | '2'        || 1                  | 0
+    }
+
     def getLoggingEvent() {
         return logger.list[0]
     }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
index 132c4bc..47a1c89 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
@@ -24,6 +24,8 @@
 import io.cloudevents.CloudEvent
 import io.cloudevents.core.builder.CloudEventBuilder
 import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService
 import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -45,9 +47,11 @@
     ObjectMapper objectMapper
     @SpringBean
     InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
+    @SpringBean
+    CmNotificationSubscriptionPersistenceService mockCmNotificationSubscriptionPersistenceService = Mock(CmNotificationSubscriptionPersistenceService)
 
     def testCache = [:]
-    def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(testCache, mockInventoryPersistence)
+    def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(mockCmNotificationSubscriptionPersistenceService, testCache, mockInventoryPersistence)
 
     CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent
     def yangModelCmHandle1 = new YangModelCmHandle(id:'ch1',dmiServiceName:'dmi-1')
@@ -62,9 +66,9 @@
 
     def 'Load CM subscription event to cache'() {
         given: 'a valid subscription event with Id'
-            def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId();
+            def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
         and: 'list of predicates'
-            def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates();
+            def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
         when: 'a valid event object loaded in cache'
             objectUnderTest.add(subscriptionId, predicates)
         then: 'the cache contains the correct entry with #subscriptionId subscription ID'
@@ -115,6 +119,29 @@
             assert mapOfCMHandleIDsByDmi.get('dmi-2') == ['ch2'].toSet()
     }
 
+    def 'Update subscription status in cache per DMI service name'() {
+        given: 'populated cache'
+            def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+            def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+            objectUnderTest.add(subscriptionId, predicates)
+        when: 'subscription status per dmi is updated in cache'
+            objectUnderTest.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmNotificationSubscriptionStatus.ACCEPTED)
+        then: 'verify status has been updated in cache'
+            def predicate = testCache.get(subscriptionId)
+            assert predicate.get('dmi-1').cmNotificationSubscriptionStatus == CmNotificationSubscriptionStatus.ACCEPTED
+    }
+
+    def 'Persist Cache into database per dmi'() {
+        given: 'populate cache'
+            def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+            def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+            objectUnderTest.add(subscriptionId, predicates)
+        when: 'subscription is persisted in database'
+            objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
+        then: 'persistence service is called the correct number of times per dmi'
+            4 * mockCmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(_,_,_,subscriptionId)
+    }
+
     def setUpTestEvent(){
         def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
         def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
diff --git a/docs/cps-ncmp-message-status-codes.rst b/docs/cps-ncmp-message-status-codes.rst
index 90590a2..018cf4a 100644
--- a/docs/cps-ncmp-message-status-codes.rst
+++ b/docs/cps-ncmp-message-status-codes.rst
@@ -40,6 +40,8 @@
     +-----------------+------------------------------------------------------+-----------------------------------+
     | 111             | alternate id already associated                      | Inventory                         |
     +-----------------+------------------------------------------------------+-----------------------------------+
+    | 112             | message too large                                    | Data Operation                    |
+    +-----------------+------------------------------------------------------+-----------------------------------+
 
 .. note::