Merge "Rearrange CSIT test order"
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
index 50d96f8..9cfc49f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/DataJobStatusService.java
@@ -31,15 +31,13 @@
      *
      * @param authorization     The authorization header from the REST request.
      * @param dmiServiceName    The name of the DMI Service relevant to the data job.
-     * @param requestId         The unique identifier for the overall data job request.
-     * @param dataProducerJobId The identifier of the data producer job within the DMI system.
      * @param dataProducerId    The ID of the producer registered by DMI, used for operations related to this request.
      *                          This could include alternate IDs or specific identifiers.
+     * @param dataProducerJobId The identifier of the data producer job within the DMI system.
      * @return The current status of the data job as a String.
      */
     String getDataJobStatus(final String authorization,
                             final String dmiServiceName,
-                            final String requestId,
-                            final String dataProducerJobId,
-                            final String dataProducerId);
+                            final String dataProducerId,
+                            final String dataProducerJobId);
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java
index 0e0498e..a7a6573 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/datajobs/models/SubJobWriteRequest.java
@@ -25,17 +25,20 @@
 /**
  * Request data for a write operation by the DMI Plugin.
  *
+ * @param destination     The destination of the results. ( e.g. S3 Bucket)
  * @param dataAcceptType  Define the data response accept type.
  *                        e.g. "application/vnd.3gpp.object-tree-hierarchical+json",
  *                        "application/vnd.3gpp.object-tree-flat+json" etc.
  * @param dataContentType Define the data request content type.
  *                        e.g. "application/3gpp-json-patch+json" etc.
  * @param dataProducerId  Identifier of the data producer.
- *
+ * @param dataJobId       Identifier for the overall Datajob
  * @param data            A collection of outgoing write operations.
  */
 public record SubJobWriteRequest (
+        String destination,
         String dataAcceptType,
         String dataContentType,
         String dataProducerId,
+        String dataJobId,
         Collection<DmiWriteOperation> data) {}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
index a6ecaa1..fb17f06 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImpl.java
@@ -42,26 +42,25 @@
     @Override
     public String getDataJobStatus(final String authorization,
                                    final String dmiServiceName,
-                                   final String requestId,
-                                   final String dataProducerJobId,
-                                   final String dataProducerId) {
+                                   final String dataProducerId,
+                                   final String dataProducerJobId) {
 
-        final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName, requestId,
-                dataProducerJobId, dataProducerId);
+        final UrlTemplateParameters urlTemplateParameters = buildUrlParameters(dmiServiceName,
+                                                                              dataProducerId,
+                                                                              dataProducerJobId);
         return dmiRestClient.getDataJobStatus(urlTemplateParameters, authorization).block();
     }
 
     private UrlTemplateParameters buildUrlParameters(final String dmiServiceName,
-                                                     final String requestId,
-                                                     final String dataProducerJobId,
-                                                     final String dataProducerId) {
+                                                     final String dataProducerId,
+                                                     final String dataProducerJobId) {
         return DmiServiceUrlTemplateBuilder.newInstance()
-                .fixedPathSegment("dataJob")
-                .variablePathSegment("requestId", requestId)
+                .fixedPathSegment("cmwriteJob")
+                .fixedPathSegment("dataProducer")
+                .variablePathSegment("dataProducerId", dataProducerId)
                 .fixedPathSegment("dataProducerJob")
                 .variablePathSegment("dataProducerJobId", dataProducerJobId)
                 .fixedPathSegment("status")
-                .queryParameter("dataProducerId", dataProducerId)
                 .createUrlTemplateParameters(dmiServiceName, dmiProperties.getDmiBasePath());
     }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
index c93709c..0d14dac 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandler.java
@@ -64,10 +64,15 @@
                                      final Map<ProducerKey, List<DmiWriteOperation>> dmiWriteOperationsPerProducerKey) {
         final List<SubJobWriteResponse> subJobWriteResponses = new ArrayList<>(dmiWriteOperationsPerProducerKey.size());
         dmiWriteOperationsPerProducerKey.forEach((producerKey, dmi3ggpWriteOperations) -> {
-            final SubJobWriteRequest subJobWriteRequest = new SubJobWriteRequest(dataJobMetadata.dataAcceptType(),
-                    dataJobMetadata.dataContentType(), producerKey.dataProducerIdentifier(), dmi3ggpWriteOperations);
+            final SubJobWriteRequest subJobWriteRequest = new SubJobWriteRequest(dataJobMetadata.destination(),
+                                                                                 dataJobMetadata.dataAcceptType(),
+                                                                                 dataJobMetadata.dataContentType(),
+                                                                                 producerKey.dataProducerIdentifier(),
+                                                                                 dataJobId,
+                                                                                 dmi3ggpWriteOperations);
 
-            final UrlTemplateParameters urlTemplateParameters = getUrlTemplateParameters(dataJobId, producerKey);
+            final UrlTemplateParameters urlTemplateParameters = getUrlTemplateParameters(dataJobMetadata.destination(),
+                                                                                         producerKey);
             final ResponseEntity<Object> responseEntity = dmiRestClient.synchronousPostOperationWithJsonData(
                     RequiredDmiService.DATA,
                     urlTemplateParameters,
@@ -82,10 +87,10 @@
         return subJobWriteResponses;
     }
 
-    private UrlTemplateParameters getUrlTemplateParameters(final String dataJobId, final ProducerKey producerKey) {
+    private UrlTemplateParameters getUrlTemplateParameters(final String destination, final ProducerKey producerKey) {
         return DmiServiceUrlTemplateBuilder.newInstance()
-                .fixedPathSegment("writeJob")
-                .variablePathSegment("requestId", dataJobId)
+                .fixedPathSegment("cmwriteJob")
+                .queryParameter("destination", destination)
                 .createUrlTemplateParameters(producerKey.dmiServiceName(), dmiProperties.getDmiBasePath());
     }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
index ba6bba9..177b4b0 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/dmi/DmiRestClient.java
@@ -160,7 +160,7 @@
                 .headers(httpHeaders -> configureHttpHeaders(httpHeaders, authorization))
                 .retrieve()
                 .bodyToMono(JsonNode.class)
-                .map(responseHealthStatus -> responseHealthStatus.path("status").asText())
+                .map(jsonNode -> jsonNode.path("status").asText())
                 .onErrorMap(throwable -> handleDmiClientException(throwable, OperationType.READ.getOperationName()));
     }
 
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
index cc04298..d231dfa 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DataJobStatusServiceImplSpec.groovy
@@ -39,15 +39,14 @@
     def 'Forward a data job status query to DMI.' () {
         given: 'the required parameters for querying'
             def dmiServiceName = 'some-dmi-service'
-            def requestId = 'some-request-id'
+            def dataProducerId = 'some-data-producer-id'
             def dataProducerJobId = 'some-data-producer-job-id'
-            def dataJobId = 'some-data-job-id'
             def authorization = 'my authorization header'
-            def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/dataJob/{requestId}/dataProducerJob/{dataProducerJobId}/status?dataProducerId={dataProducerId}', ['dataProducerJobId':'some-data-producer-job-id', 'dataProducerId':'some-data-job-id', 'requestId':'some-request-id'])
+            def urlParams = new UrlTemplateParameters('some-dmi-service/dmi/v1/cmwriteJob/dataProducer/{dataProducerId}/dataProducerJob/{dataProducerJobId}/status', ['dataProducerId':'some-data-producer-id', 'dataProducerJobId':'some-data-producer-job-id'])
         and: 'the rest client returns a status for the given parameters'
             mockDmiRestClient.getDataJobStatus(urlParams, authorization) >> Mono.just('some status')
         when: 'the job status is queried'
-            def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataJobId)
+            def status = objectUnderTest.getDataJobStatus(authorization, dmiServiceName, dataProducerId, dataProducerJobId)
         then: 'the status from the rest client is returned'
             assert status == 'some status'
     }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
index b3dd02d..041fbd9 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/DmiSubJobRequestHandlerSpec.groovy
@@ -30,7 +30,7 @@
             def authorization = 'my authorization header'
         and: 'the dmi rest client will return a response (for the correct parameters)'
             def responseEntity = new ResponseEntity<>(new SubJobWriteResponse('my-sub-job-id', 'dmi1', 'prod1'), HttpStatus.OK)
-            def expectedJson = '{"dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}'
+            def expectedJson = '{"destination":"d1","dataAcceptType":"t1","dataContentType":"t2","dataProducerId":"prod1","dataJobId":"some-job-id","data":[{"path":"p","op":"operation","moduleSetTag":"tag","value":null,"operationId":"o1","privateProperties":{}}]}'
             mockDmiRestClient.synchronousPostOperationWithJsonData(RequiredDmiService.DATA, _, expectedJson, OperationType.CREATE, authorization) >> responseEntity
         when: 'sending request to DMI invoked'
             objectUnderTest.sendRequestsToDmi(authorization, dataJobId, dataJobMetadata, dmiWriteOperationsPerProducerKey)
diff --git a/csit/prepare-csit.sh b/csit/prepare-csit.sh
index fbd5dc5..1b8578e 100755
--- a/csit/prepare-csit.sh
+++ b/csit/prepare-csit.sh
@@ -71,7 +71,7 @@
 python3 --version
 
 echo "Installing confluent kafka library for robot framework:"
-pip install robotframework-confluentkafkalibrary
+pip install robotframework-confluentkafkalibrary==2.4.0-2
 
 pip freeze
 python3 -m robot.run --version || :
\ No newline at end of file
diff --git a/csit/pylibs.txt b/csit/pylibs.txt
index 32bfa6f..3eeb1ab 100644
--- a/csit/pylibs.txt
+++ b/csit/pylibs.txt
@@ -9,7 +9,7 @@
 robotframework-selenium2library==3.0.0
 robotframework-extendedselenium2library
 robotframework-sshlibrary
-robotframework-confluentkafkalibrary
+robotframework-confluentkafkalibrary==2.4.0-2
 scapy
 # Module jsonpath is needed by current AAA idmlite suite.
 jsonpath-rw
diff --git a/csit/tests/cps-data-operations/cps-data-operations.robot b/csit/tests/cps-data-operations/cps-data-operations.robot
index 8f1d71a..96212ff 100644
--- a/csit/tests/cps-data-operations/cps-data-operations.robot
+++ b/csit/tests/cps-data-operations/cps-data-operations.robot
@@ -64,8 +64,7 @@
         Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}      "ce_specversion"      "1.0"
         Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}      "ce_type"             "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"
         Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}      "ce_correlationid"    "${expectedRequestId}"
-        # Need to check the root cause of this failure. To be investigated separately as part of CPS-2363
-        # Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}      "ce_source"           "DMI"
+        Compare Header Values       ${header_key_value_pair[0]}   ${header_key_value_pair[1]}      "ce_source"           "DMI"
     END
     [Teardown]                      Basic Teardown                    ${group_id}
 
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy
index 5ce2475..56d8f19 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/DmiDispatcher.groovy
@@ -20,6 +20,8 @@
 
 package org.onap.cps.integration.base
 
+import org.onap.cps.ncmp.api.datajobs.models.SubJobWriteRequest
+
 import static org.onap.cps.integration.base.CpsIntegrationSpecBase.readResourceDataFile
 
 import groovy.json.JsonSlurper
@@ -91,23 +93,23 @@
             case ~'^/dmi/v1/data$':
                 return mockResponseWithBody(HttpStatus.ACCEPTED, '{}')
 
-            // get write sub job response
-            case ~'^/dmi/v1/writeJob/(.*)$':
-                return mockWriteJobResponse(request)
-
             // get data job status
-            case ~'^/dmi/v1/dataJob/(.*)/dataProducerJob/(.*)/status(.*)$':
+            case ~'^/dmi/v1/cmwriteJob/dataProducer/(.*)/dataProducerJob/(.*)/status$':
                 return mockResponseWithBody(HttpStatus.OK, '{"status":"status details from mock service"}')
 
+            // get write sub job response
+            case ~'^/dmi/v1/cmwriteJob(.*)$':
+                return mockWriteJobResponse(request)
+
             default:
                 throw new IllegalArgumentException('Mock DMI does not implement endpoint ' + request.path)
         }
     }
 
     def mockWriteJobResponse(request) {
-        def requestId = Matcher.lastMatcher[0][1]
+        def destination = Matcher.lastMatcher[0][1]
         def subJobWriteRequest = jsonSlurper.parseText(request.getBody().readUtf8())
-        this.receivedSubJobs.put(requestId, subJobWriteRequest)
+        this.receivedSubJobs.put(destination, subJobWriteRequest)
         def response = '{"subJobId":"some sub job id", "dmiServiceName":"some dmi service name", "dataProducerId":"some data producer id"}'
         return mockResponseWithBody(HttpStatus.OK, response)
     }
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy
index fdcad2b..6e5c0e4 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/DataJobStatusServiceSpec.groovy
@@ -12,12 +12,11 @@
     def 'Get the status of a data job from DMI.'() {
         given: 'the required data about the data job'
             def dmiServiceName = DMI1_URL
-            def requestId = 'some-request-id'
-            def dataProducerJobId = 'some-data-producer-job-id'
             def dataProducerId = 'some-data-producer-id'
+            def dataProducerJobId = 'some-data-producer-job-id'
             def authorization = 'my authorization header'
         when: 'the data job status checked'
-            def result = dataJobStatusService.getDataJobStatus(authorization, dmiServiceName, requestId, dataProducerJobId, dataProducerId)
+            def result = dataJobStatusService.getDataJobStatus(authorization, dmiServiceName, dataProducerId, dataProducerJobId)
         then: 'the status is that defined in the mock service.'
             assert result == 'status details from mock service'
     }
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy
index b73634f..834e139 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/WriteSubJobSpec.groovy
@@ -52,7 +52,7 @@
         given: 'the required input data for the write job'
             def authorization = 'my authorization header'
             def dataJobWriteRequest = new DataJobWriteRequest([new WriteOperation('p1', '', '', null), new WriteOperation('p2', '', '', null), new WriteOperation('p3', '', '', null)])
-            def myDataJobMetadata = new DataJobMetadata('', '', '')
+            def myDataJobMetadata = new DataJobMetadata('d1', '', '')
             def dataJobId = 'my-data-job-id'
         when: 'sending a write job to NCMP with 2 sub-jobs for DMI 1 and 1 sub-job for DMI 2'
             def response = dataJobService.writeDataJob(authorization, dataJobId, myDataJobMetadata, dataJobWriteRequest)
@@ -63,12 +63,12 @@
             assert response[0].dmiServiceName == "some dmi service name"
             assert response[0].dataProducerId == "some data producer id"
         and: 'dmi 1 received the correct job details'
-            def receivedSubJobsForDispatcher1 = dmiDispatcher1.receivedSubJobs['my-data-job-id']['data'].collect()
+            def receivedSubJobsForDispatcher1 = dmiDispatcher1.receivedSubJobs['?destination=d1']['data'].collect()
             assert receivedSubJobsForDispatcher1.size() == 2
             assert receivedSubJobsForDispatcher1[0]['path'] == 'p1'
             assert receivedSubJobsForDispatcher1[1]['path'] == 'p2'
         and: 'dmi 2 received the correct job details'
-            def receivedSubJobsForDispatcher2 = dmiDispatcher2.receivedSubJobs['my-data-job-id']['data'].collect()
+            def receivedSubJobsForDispatcher2 = dmiDispatcher2.receivedSubJobs['?destination=d1']['data'].collect()
             assert receivedSubJobsForDispatcher2.size() == 1
             assert receivedSubJobsForDispatcher2[0]['path'] == 'p3'
     }