NCMP : NCMP : Handle non responding DMI-Plugin

- Added new response code for non-responding dmi and non-success.
- Captured any exception after sending request to dmi service and then
  create a cloud event with error code and publish it to client topic.
- Minor modificarion into resource data operation util class.

Issue-ID: CPS-1558

Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Change-Id: I39d409fb42856b816bf9833c2a23f7fa250a1e62
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
index 9f7ef1e..42d8135 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpEventResponseCode.java
@@ -25,8 +25,10 @@
 @Getter
 public enum NcmpEventResponseCode {
 
-    CODE_100("100", "cm handle id(s) not found"),
-    CODE_101("101", "cm handle(s) not ready");
+    CM_HANDLES_NOT_FOUND("100", "cm handle id(s) not found"),
+    CM_HANDLES_NOT_READY("101", "cm handle(s) not ready"),
+    DMI_SERVICE_NOT_RESPONDING("102", "dmi plugin service is not responding"),
+    UNABLE_TO_READ_RESOURCE_DATA("103", "dmi plugin service is not able to read resource data");
 
     private final String statusCode;
     private final String statusMessage;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
index b4784f4..8f0975f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/operations/DmiDataOperations.java
@@ -30,8 +30,10 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.NcmpEventResponseCode;
 import org.onap.cps.ncmp.api.impl.client.DmiRestClient;
 import org.onap.cps.ncmp.api.impl.config.NcmpConfiguration;
+import org.onap.cps.ncmp.api.impl.exception.HttpClientRequestException;
 import org.onap.cps.ncmp.api.impl.executor.TaskExecutor;
 import org.onap.cps.ncmp.api.impl.utils.DmiServiceUrlBuilder;
 import org.onap.cps.ncmp.api.impl.utils.data.operation.ResourceDataOperationRequestUtils;
@@ -43,7 +45,9 @@
 import org.onap.cps.utils.JsonObjectMapper;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
+import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
+import org.springframework.web.util.UriComponentsBuilder;
 
 /**
  * Operations class for DMI data.
@@ -240,14 +244,37 @@
         final String dataOperationRequestBodiesAsJsonString =
                 jsonObjectMapper.asJsonString(dmiDataOperationRequestBodies);
         TaskExecutor.executeTask(() -> dmiRestClient.postOperationWithJsonData(dataOperationResourceUrl,
-                        dataOperationRequestBodiesAsJsonString, READ),
+                                dataOperationRequestBodiesAsJsonString, READ),
                         DEFAULT_ASYNC_TASK_EXECUTOR_TIMEOUT_IN_MILLISECONDS)
-                .whenCompleteAsync(this::handleTaskCompletion);
+                .whenCompleteAsync((response, throwable) -> handleTaskCompletionException(throwable,
+                        dataOperationResourceUrl, dmiDataOperationRequestBodies));
     }
 
-    private void handleTaskCompletion(final Object response, final Throwable throwable) {
-        // TODO Need to publish an error response to client given topic.
-        //  Code should be implemented into https://jira.onap.org/browse/CPS-1558 (
-        //  NCMP : Handle non responding DMI-Plugin)
+    private void handleTaskCompletionException(final Throwable throwable,
+                                               final String dataOperationResourceUrl,
+                                               final List<DmiDataOperation> dmiDataOperationRequestBodies) {
+        if (throwable != null) {
+            final MultiValueMap<String, String> dataOperationResourceUrlParameters =
+                    UriComponentsBuilder.fromUriString(dataOperationResourceUrl).build().getQueryParams();
+            final String topicName = dataOperationResourceUrlParameters.get("topic").get(0);
+            final String requestId = dataOperationResourceUrlParameters.get("requestId").get(0);
+
+            final MultiValueMap<String, Map<NcmpEventResponseCode, List<String>>>
+                    cmHandleIdsPerResponseCodesPerOperationId = new LinkedMultiValueMap<>();
+
+            dmiDataOperationRequestBodies.forEach(dmiDataOperationRequestBody -> {
+                final List<String> cmHandleIds = dmiDataOperationRequestBody.getCmHandles().stream()
+                        .map(CmHandle::getId).collect(Collectors.toList());
+                if (throwable.getCause() instanceof HttpClientRequestException) {
+                    cmHandleIdsPerResponseCodesPerOperationId.add(dmiDataOperationRequestBody.getOperationId(),
+                            Map.of(NcmpEventResponseCode.UNABLE_TO_READ_RESOURCE_DATA, cmHandleIds));
+                } else {
+                    cmHandleIdsPerResponseCodesPerOperationId.add(dmiDataOperationRequestBody.getOperationId(),
+                            Map.of(NcmpEventResponseCode.DMI_SERVICE_NOT_RESPONDING, cmHandleIds));
+                }
+            });
+            ResourceDataOperationRequestUtils.publishErrorMessageToClientTopic(topicName, requestId,
+                    cmHandleIdsPerResponseCodesPerOperationId);
+        }
     }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
index 957f48a..d8fb904 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/data/operation/ResourceDataOperationRequestUtils.java
@@ -68,7 +68,7 @@
             final Collection<YangModelCmHandle> yangModelCmHandles) {
 
         final Map<String, List<DmiDataOperation>> dmiDataOperationsOutPerDmiServiceName = new HashMap<>();
-        final MultiValueMap<String, Map<NcmpEventResponseCode, List<String>>> cmHandleIdsPerOperationIdPerResponseCode
+        final MultiValueMap<String, Map<NcmpEventResponseCode, List<String>>> cmHandleIdsPerResponseCodesPerOperationId
                 = new LinkedMultiValueMap<>();
         final Set<String> nonReadyCmHandleIdsLookup = filterAndGetNonReadyCmHandleIds(yangModelCmHandles);
 
@@ -100,25 +100,34 @@
                     }
                 }
             }
-            populateCmHandleIdsPerOperationIdPerResponseCode(cmHandleIdsPerOperationIdPerResponseCode,
-                    dataOperationDefinitionIn.getOperationId(), NcmpEventResponseCode.CODE_100, nonExistingCmHandleIds);
-            populateCmHandleIdsPerOperationIdPerResponseCode(cmHandleIdsPerOperationIdPerResponseCode,
-                    dataOperationDefinitionIn.getOperationId(), NcmpEventResponseCode.CODE_101, nonReadyCmHandleIds);
+            populateCmHandleIdsPerOperationIdPerResponseCode(cmHandleIdsPerResponseCodesPerOperationId,
+                    dataOperationDefinitionIn.getOperationId(), NcmpEventResponseCode.CM_HANDLES_NOT_FOUND,
+                    nonExistingCmHandleIds);
+            populateCmHandleIdsPerOperationIdPerResponseCode(cmHandleIdsPerResponseCodesPerOperationId,
+                    dataOperationDefinitionIn.getOperationId(), NcmpEventResponseCode.CM_HANDLES_NOT_READY,
+                    nonReadyCmHandleIds);
         }
-        if (!cmHandleIdsPerOperationIdPerResponseCode.isEmpty()) {
-            publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerOperationIdPerResponseCode);
+        if (!cmHandleIdsPerResponseCodesPerOperationId.isEmpty()) {
+            publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleIdsPerResponseCodesPerOperationId);
         }
         return dmiDataOperationsOutPerDmiServiceName;
     }
 
+    /**
+     * Creates data operation cloud event and publish it to client topic.
+     *
+     * @param clientTopic                              client given topic
+     * @param requestId                                unique identifier per request
+     * @param cmHandleIdsPerResponseCodesPerOperationId list of cm handle ids per operation id with response code
+     */
     @Async
-    private static void publishErrorMessageToClientTopic(final String clientTopic,
+    public static void publishErrorMessageToClientTopic(final String clientTopic,
                                                          final String requestId,
                                                          final MultiValueMap<String,
                                                                  Map<NcmpEventResponseCode, List<String>>>
-                                                                 cmHandleIdsPerOperationIdPerResponseCode) {
+                                                                    cmHandleIdsPerResponseCodesPerOperationId) {
         final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
-                requestId, cmHandleIdsPerOperationIdPerResponseCode);
+                requestId, cmHandleIdsPerResponseCodesPerOperationId);
         final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
         eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
     }
@@ -166,13 +175,13 @@
     }
 
     private static void populateCmHandleIdsPerOperationIdPerResponseCode(final MultiValueMap<String,
-            Map<NcmpEventResponseCode, List<String>>> cmHandleIdsPerOperationIdByResponseCode,
+            Map<NcmpEventResponseCode, List<String>>> cmHandleIdsPerResponseCodesPerOperationId,
                                                                         final String operationId,
                                                                         final NcmpEventResponseCode
                                                                                 ncmpEventResponseCode,
                                                                         final List<String> cmHandleIds) {
         if (!cmHandleIds.isEmpty()) {
-            cmHandleIdsPerOperationIdByResponseCode.add(operationId, Map.of(ncmpEventResponseCode, cmHandleIds));
+            cmHandleIdsPerResponseCodesPerOperationId.add(operationId, Map.of(ncmpEventResponseCode, cmHandleIds));
         }
     }
 }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
index 59e62e3..3f40f43 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/operations/DmiDataOperationsSpec.groovy
@@ -22,12 +22,16 @@
 package org.onap.cps.ncmp.api.impl.operations
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.jackson.PojoCloudEventDataMapper
+import org.onap.cps.ncmp.api.NcmpEventResponseCode
 import org.onap.cps.ncmp.api.impl.config.NcmpConfiguration
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.ncmp.api.impl.exception.HttpClientRequestException
 import org.onap.cps.ncmp.api.impl.utils.DmiServiceUrlBuilder
 import org.onap.cps.ncmp.api.impl.utils.context.CpsApplicationContext
 import org.onap.cps.ncmp.api.models.DataOperationRequest
-import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
 import org.spockframework.spring.SpringBean
@@ -37,6 +41,7 @@
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.http.HttpStatus
 import spock.lang.Shared
+import java.util.concurrent.TimeoutException
 
 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL
 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING
@@ -110,6 +115,28 @@
             assert requestBodyAsJsonStringArg == expectedBatchRequestAsJson
     }
 
+    def 'Execute (async) data operation from DMI service for #scenario.'() {
+        given: 'data operation request body and dmi resource url'
+            def dmiDataOperation = DmiDataOperation.builder().operationId('some-operation-id').build()
+            dmiDataOperation.getCmHandles().add(CmHandle.builder().id('some-cm-handle-id').build())
+            def dmiDataOperationResourceDataUrl = "http://dmi-service-name:dmi-port/dmi/v1/data?topic=my-topic-name&requestId=some-request-id"
+            def actualDataOperationCloudEvent = null
+        when: 'exception occurs after sending request to dmi service'
+            objectUnderTest.handleTaskCompletionException(new Throwable(exception), dmiDataOperationResourceDataUrl, List.of(dmiDataOperation))
+        then: 'a cloud event is published'
+            eventsPublisher.publishCloudEvent('my-topic-name', 'some-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
+        and: 'the event contains the expected error details'
+            def eventDataValue = extractDataValue(actualDataOperationCloudEvent)
+            assert eventDataValue.operationId == dmiDataOperation.operationId
+            assert eventDataValue.ids == dmiDataOperation.cmHandles.id
+            assert eventDataValue.statusCode == responseCode.statusCode
+            assert eventDataValue.statusMessage == responseCode.statusMessage
+        where: 'the following exceptions are occurred'
+            scenario                        | exception                                                                                                || responseCode
+            'http client request exception' | new HttpClientRequestException('error-message', 'error-details', HttpStatus.SERVICE_UNAVAILABLE.value()) || NcmpEventResponseCode.UNABLE_TO_READ_RESOURCE_DATA
+            'timeout exception'             | new TimeoutException()                                                                                   || NcmpEventResponseCode.DMI_SERVICE_NOT_RESPONDING
+    }
+
     def 'call get all resource data.'() {
         given: 'the system returns a cm handle with a sample property'
             mockYangModelCmHandleRetrieval([yangModelCmHandleProperty])
@@ -142,4 +169,8 @@
             CREATE    || 'create'
             UPDATE    || 'update'
     }
+
+    def extractDataValue(actualDataOperationCloudEvent) {
+        return CloudEventUtils.mapData(actualDataOperationCloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), DataOperationEvent.class)).getValue().data.responses[0]
+    }
 }