Merge "Cm Subscription: Remove subscription method"
diff --git a/cps-ncmp-rest/docs/openapi/components.yaml b/cps-ncmp-rest/docs/openapi/components.yaml
index 0ad453a..8aa38c0 100644
--- a/cps-ncmp-rest/docs/openapi/components.yaml
+++ b/cps-ncmp-rest/docs/openapi/components.yaml
@@ -366,6 +366,7 @@
type: array
items:
type: string
+ description: targeted cm handles, maximum of 50 supported. If this limit is exceeded the request wil be refused.
example: [ "da310eecdb8d44c2acc0ddaae01174b1","c748c58f8e0b438f9fd1f28370b17d47" ]
examples:
@@ -695,7 +696,7 @@
schema:
$ref: '#/components/schemas/ErrorMessage'
example:
- status: 400 BAD_REQUEST
+ status: 400
message: Bad request error message
details: Bad request error details
Conflict:
@@ -705,9 +706,19 @@
schema:
$ref: '#/components/schemas/ErrorMessage'
example:
- status: 409 CONFLICT
+ status: 409
message: Conflict error message
details: Conflict error details
+ PayloadTooLarge:
+ description: The request is larger than the server is willing or able to process
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorMessage'
+ example:
+ status: 413
+ message: Payload Too Large error message
+ details: Payload Too Large error details
NotImplemented:
description: The given path has not been implemented
content:
diff --git a/cps-ncmp-rest/docs/openapi/ncmp.yml b/cps-ncmp-rest/docs/openapi/ncmp.yml
index 0cb1cdf..d0b1f35 100755
--- a/cps-ncmp-rest/docs/openapi/ncmp.yml
+++ b/cps-ncmp-rest/docs/openapi/ncmp.yml
@@ -194,7 +194,7 @@
tags:
- network-cm-proxy
summary: Execute a data operation for group of cm handle ids
- description: This request will be handled asynchronously using messaging to the supplied topic. The rest response will be an acknowledge with a requestId to identify the relevant messages.
+ description: This request will be handled asynchronously using messaging to the supplied topic. The rest response will be an acknowledge with a requestId to identify the relevant messages. A maximum of 50 cm handles per operation is supported.
operationId: executeDataOperationForCmHandles
parameters:
- $ref: 'components.yaml#/components/parameters/requiredTopicParamInQuery'
@@ -216,6 +216,8 @@
$ref: 'components.yaml#/components/responses/BadRequest'
403:
$ref: 'components.yaml#/components/responses/Forbidden'
+ 413:
+ $ref: 'components.yaml#/components/responses/PayloadTooLarge'
500:
$ref: 'components.yaml#/components/responses/InternalServerError'
502:
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
index 75112ca..eca7ebf 100644
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/handlers/NcmpPassthroughResourceRequestHandler.java
@@ -33,6 +33,7 @@
import org.onap.cps.ncmp.api.models.CmResourceAddress;
import org.onap.cps.ncmp.api.models.DataOperationRequest;
import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException;
+import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException;
import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor;
import org.onap.cps.ncmp.rest.util.TopicValidator;
import org.springframework.http.ResponseEntity;
@@ -45,6 +46,10 @@
private static final Object noReturn = null;
+ private static final int MAXIMUM_CM_HANDLES_PER_OPERATION = 50;
+
+ private static final String PAYLOAD_TOO_LARGE_TEMPLATE = "Operation '%s' affects too many (%d) cm handles";
+
/**
* Constructor.
*
@@ -101,17 +106,23 @@
}
private void validateDataOperationRequest(final String topicParamInQuery,
- final DataOperationRequest
- dataOperationRequest) {
+ final DataOperationRequest dataOperationRequest) {
TopicValidator.validateTopicName(topicParamInQuery);
dataOperationRequest.getDataOperationDefinitions().forEach(dataOperationDetail -> {
if (OperationType.fromOperationName(dataOperationDetail.getOperation()) != READ) {
throw new OperationNotSupportedException(
dataOperationDetail.getOperation() + " operation not yet supported");
- } else if (DatastoreType.fromDatastoreName(dataOperationDetail.getDatastore()) == OPERATIONAL) {
+ }
+ if (DatastoreType.fromDatastoreName(dataOperationDetail.getDatastore()) == OPERATIONAL) {
throw new InvalidDatastoreException(dataOperationDetail.getDatastore()
+ " datastore is not supported");
}
+ if (dataOperationDetail.getCmHandleIds().size() > MAXIMUM_CM_HANDLES_PER_OPERATION) {
+ final String errorMessage = String.format(PAYLOAD_TOO_LARGE_TEMPLATE,
+ dataOperationDetail.getOperationId(),
+ dataOperationDetail.getCmHandleIds().size());
+ throw new PayloadTooLargeException(errorMessage);
+ }
});
}
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java
index 7498c5f..d323691 100755
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandler.java
@@ -1,7 +1,7 @@
/*
* ============LICENSE_START=======================================================
* Copyright (C) 2021 Pantheon.tech
- * Modifications Copyright (C) 2021-2023 Nordix Foundation
+ * Modifications Copyright (C) 2021-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -60,8 +60,7 @@
* @return response with response code 500.
*/
@ExceptionHandler
- public static ResponseEntity<Object> handleInternalServerErrorExceptions(
- final Exception exception) {
+ public static ResponseEntity<Object> handleInternalServerErrorExceptions(final Exception exception) {
return buildErrorResponse(HttpStatus.INTERNAL_SERVER_ERROR, exception);
}
@@ -73,7 +72,7 @@
@ExceptionHandler({HttpClientRequestException.class})
public static ResponseEntity<Object> handleClientRequestExceptions(
final HttpClientRequestException httpClientRequestException) {
- return wrapDmiErrorResponse(HttpStatus.BAD_GATEWAY, httpClientRequestException);
+ return wrapDmiErrorResponse(httpClientRequestException);
}
@ExceptionHandler({DmiRequestException.class, DataValidationException.class, OperationNotSupportedException.class,
@@ -88,10 +87,15 @@
}
@ExceptionHandler({DataNodeNotFoundException.class})
- public static ResponseEntity<Object> handleNotFoundExceptions(final CpsException exception) {
+ public static ResponseEntity<Object> handleNotFoundExceptions(final Exception exception) {
return buildErrorResponse(HttpStatus.NOT_FOUND, exception);
}
+ @ExceptionHandler({PayloadTooLargeException.class})
+ public static ResponseEntity<Object> handlePayloadTooLargeExceptions(final Exception exception) {
+ return buildErrorResponse(HttpStatus.PAYLOAD_TOO_LARGE, exception);
+ }
+
private static ResponseEntity<Object> buildErrorResponse(final HttpStatus status, final Exception exception) {
if (exception.getCause() != null || !(exception instanceof CpsException)) {
log.error("Exception occurred", exception);
@@ -111,15 +115,14 @@
return new ResponseEntity<>(errorMessage, status);
}
- private static ResponseEntity<Object> wrapDmiErrorResponse(
- final HttpStatus httpStatus,
- final HttpClientRequestException httpClientRequestException) {
+ private static ResponseEntity<Object> wrapDmiErrorResponse(final HttpClientRequestException
+ httpClientRequestException) {
final var dmiErrorMessage = new DmiErrorMessage();
final var dmiErrorResponse = new DmiErrorMessageDmiResponse();
dmiErrorResponse.setHttpCode(httpClientRequestException.getHttpStatus());
dmiErrorResponse.setBody(httpClientRequestException.getDetails());
dmiErrorMessage.setMessage(httpClientRequestException.getMessage());
dmiErrorMessage.setDmiResponse(dmiErrorResponse);
- return new ResponseEntity<>(dmiErrorMessage, httpStatus);
+ return new ResponseEntity<>(dmiErrorMessage, HttpStatus.BAD_GATEWAY);
}
}
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/PayloadTooLargeException.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/PayloadTooLargeException.java
new file mode 100644
index 0000000..cddbd08
--- /dev/null
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/exceptions/PayloadTooLargeException.java
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.rest.exceptions;
+
+public class PayloadTooLargeException extends RuntimeException {
+
+ /**
+ * Instantiates a new payload too large exception.
+ */
+ public PayloadTooLargeException(final String message) {
+ super(message);
+ }
+}
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
index bdd0e71..aef37c9 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/handlers/NcmpDatastoreRequestHandlerSpec.groovy
@@ -27,6 +27,7 @@
import org.onap.cps.ncmp.api.models.DataOperationRequest
import org.onap.cps.ncmp.api.models.CmResourceAddress
import org.onap.cps.ncmp.rest.exceptions.OperationNotSupportedException
+import org.onap.cps.ncmp.rest.exceptions.PayloadTooLargeException
import org.onap.cps.ncmp.rest.executor.CpsNcmpTaskExecutor
import spock.lang.Specification
import spock.util.concurrent.PollingConditions
@@ -110,9 +111,7 @@
}
def 'Attempt to execute async data operation request with error #scenario'() {
- given: 'notification feature is turned on'
- objectUnderTest.notificationFeatureEnabled = true
- and: 'a data operation definition with datastore: #datastore'
+ given: 'a data operation definition with datastore: #datastore'
def dataOperationDefinition = new DataOperationDefinition(operation: 'read', datastore: datastore)
when: 'data operation request is executed'
def dataOperationRequest = new DataOperationRequest(dataOperationDefinitions: [dataOperationDefinition])
@@ -127,11 +126,9 @@
}
def 'Attempt to execute async data operation request with #scenario operation: #operation.'() {
- given: 'notification feature is turned on'
- objectUnderTest.notificationFeatureEnabled = true
- and: 'a data operation definition with operation: #operation'
+ given: 'a data operation definition with operation: #operation'
def dataOperationDefinition = new DataOperationDefinition(operation: operation, datastore: 'ncmp-datastore:passthrough-running')
- when: 'bulk request is executed'
+ when: 'data operation request is executed'
objectUnderTest.executeRequest('someTopic', new DataOperationRequest(dataOperationDefinitions:[dataOperationDefinition]), NO_AUTH_HEADER)
then: 'the expected type of exception is thrown'
thrown(expectedException)
@@ -144,4 +141,16 @@
'unsupported' | 'delete' || OperationNotSupportedException
}
+ def 'Attempt to execute async data operation request with too many cm handles.'() {
+ given: 'a data operation definition with too many cm handles'
+ def cmHandleIds = new String[51]
+ def dataOperationDefinition = new DataOperationDefinition(operationId: 'abc', operation: 'read', datastore: 'ncmp-datastore:passthrough-running', cmHandleIds: cmHandleIds)
+ when: 'data operation request is executed'
+ objectUnderTest.executeRequest('someTopic', new DataOperationRequest(dataOperationDefinitions:[dataOperationDefinition]), NO_AUTH_HEADER)
+ then: 'a payload too large exception is thrown'
+ def exceptionThrown = thrown(PayloadTooLargeException)
+ and: 'the error message contains the offending number of cm handles'
+ assert exceptionThrown.message == "Operation 'abc' affects too many (51) cm handles"
+ }
+
}
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
index dd02b31..a79ea25 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/exceptions/NetworkCmProxyRestExceptionHandlerSpec.groovy
@@ -1,7 +1,7 @@
/*
* ============LICENSE_START=======================================================
* Copyright (C) 2021 highstreet technologies GmbH
- * Modifications Copyright (C) 2021-2023 Nordix Foundation
+ * Modifications Copyright (C) 2021-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,9 +23,10 @@
import static org.springframework.http.HttpStatus.BAD_GATEWAY
import static org.springframework.http.HttpStatus.BAD_REQUEST
+import static org.springframework.http.HttpStatus.CONFLICT
import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR
import static org.springframework.http.HttpStatus.NOT_FOUND
-import static org.springframework.http.HttpStatus.CONFLICT
+import static org.springframework.http.HttpStatus.PAYLOAD_TOO_LARGE
import static org.onap.cps.ncmp.rest.exceptions.NetworkCmProxyRestExceptionHandlerSpec.ApiType.NCMP
import static org.onap.cps.ncmp.rest.exceptions.NetworkCmProxyRestExceptionHandlerSpec.ApiType.NCMPINVENTORY
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get
@@ -111,22 +112,23 @@
dataNodeBaseEndpointNcmpInventory = "$basePathNcmpInventory/v1"
}
- def 'Get request with generic #scenario exception returns correct HTTP Status with #scenario'() {
+ def 'Get request with #scenario exception returns correct HTTP Status with #scenario'() {
when: 'an exception is thrown by the service'
setupTestException(exception, NCMP)
def response = performTestRequest(NCMP)
then: 'an HTTP response is returned with correct message and details'
assertTestResponse(response, expectedErrorCode, expectedErrorMessage, expectedErrorDetails)
where:
- scenario | exception || expectedErrorDetails | expectedErrorMessage | expectedErrorCode
- 'CPS' | new CpsException(sampleErrorMessage, sampleErrorDetails) || sampleErrorDetails | sampleErrorMessage | INTERNAL_SERVER_ERROR
- 'NCMP-server' | new ServerNcmpException(sampleErrorMessage, sampleErrorDetails) || null | sampleErrorMessage | INTERNAL_SERVER_ERROR
- 'NCMP-client' | new DmiRequestException(sampleErrorMessage, sampleErrorDetails) || null | sampleErrorMessage | BAD_REQUEST
- 'DataNode Validation' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || null | 'DataNode not found' | NOT_FOUND
- 'other' | new IllegalStateException(sampleErrorMessage) || null | sampleErrorMessage | INTERNAL_SERVER_ERROR
- 'Data Node Not Found' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || 'DataNode not found' | 'DataNode not found' | NOT_FOUND
- 'Existing entry' | new AlreadyDefinedException('name',null) || 'name already exists' | 'Already defined exception' | CONFLICT
- 'Existing entries' | AlreadyDefinedException.forDataNodes(['A', 'B'], 'myAnchorName') || '2 data node(s) already exist' | 'Already defined exception' | CONFLICT
+ scenario | exception || expectedErrorCode | expectedErrorMessage | expectedErrorDetails
+ 'CPS' | new CpsException(sampleErrorMessage, sampleErrorDetails) || INTERNAL_SERVER_ERROR | sampleErrorMessage | sampleErrorDetails
+ 'NCMP-server' | new ServerNcmpException(sampleErrorMessage, sampleErrorDetails) || INTERNAL_SERVER_ERROR | sampleErrorMessage | null
+ 'NCMP-client' | new DmiRequestException(sampleErrorMessage, sampleErrorDetails) || BAD_REQUEST | sampleErrorMessage | null
+ 'DataNode Validation' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || NOT_FOUND | 'DataNode not found' | null
+ 'other' | new IllegalStateException(sampleErrorMessage) || INTERNAL_SERVER_ERROR | sampleErrorMessage | null
+ 'Data Node Not Found' | new DataNodeNotFoundException('myDataspaceName', 'myAnchorName') || NOT_FOUND | 'DataNode not found' | 'DataNode not found'
+ 'Existing entry' | new AlreadyDefinedException('name',null) || CONFLICT | 'Already defined exception' | 'name already exists'
+ 'Existing entries' | AlreadyDefinedException.forDataNodes(['A', 'B'], 'myAnchorName') || CONFLICT | 'Already defined exception' | '2 data node(s) already exist'
+ 'Operation too large' | new PayloadTooLargeException(sampleErrorMessage) || PAYLOAD_TOO_LARGE | sampleErrorMessage | 'Check logs'
}
def 'Post request with exception returns correct HTTP Status.'() {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java
new file mode 100644
index 0000000..6122afc
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/DataJobService.java
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api;
+
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata;
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest;
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest;
+
+public interface DataJobService {
+
+ /**
+ * process read data job operations.
+ *
+ * @param dataJobId Unique identifier of the job within the request
+ * @param dataJobMetadata data job request headers
+ * @param dataJobReadRequest read data job request
+ */
+ void readDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobReadRequest dataJobReadRequest);
+
+ /**
+ * process write data job operations.
+ *
+ * @param dataJobId Unique identifier of the job within the request
+ * @param dataJobMetadata data job request headers
+ * @param dataJobWriteRequest write data job request
+ */
+ void writeDataJob(String dataJobId, DataJobMetadata dataJobMetadata, DataJobWriteRequest dataJobWriteRequest);
+}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
index 462679e..bdc3dee 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/NcmpResponseStatus.java
@@ -37,7 +37,8 @@
UNKNOWN_ERROR("108", "Unknown error"),
CM_HANDLE_ALREADY_EXIST("109", "cm-handle already exists"),
CM_HANDLE_INVALID_ID("110", "cm-handle has an invalid character(s) in id"),
- ALTERNATE_ID_ALREADY_ASSOCIATED("111", "alternate id already associated");
+ ALTERNATE_ID_ALREADY_ASSOCIATED("111", "alternate id already associated"),
+ MESSAGE_TOO_LARGE("112", "message too large");
private final String code;
private final String message;
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java
new file mode 100644
index 0000000..b4377b8
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/DataJobServiceImpl.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.DataJobService;
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata;
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest;
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest;
+
+@Slf4j
+public class DataJobServiceImpl implements DataJobService {
+
+ @Override
+ public void readDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+ final DataJobReadRequest dataJobReadRequest) {
+ log.info("data job id for read operation is: {}", dataJobId);
+ }
+
+ @Override
+ public void writeDataJob(final String dataJobId, final DataJobMetadata dataJobMetadata,
+ final DataJobWriteRequest dataJobWriteRequest) {
+ log.info("data job id for write operation is: {}", dataJobId);
+ }
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
index 4f2674a..167df5a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfig.java
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@@ -51,6 +52,8 @@
private final KafkaProperties kafkaProperties;
+ private static final SslBundles NO_SSL = null;
+
/**
* This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
* application.yml and replaces value-serializer by JsonSerializer.
@@ -59,7 +62,7 @@
*/
@Bean
public ProducerFactory<String, T> legacyEventProducerFactory() {
- final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(producerConfigProperties);
}
@@ -72,7 +75,7 @@
*/
@Bean
public ConsumerFactory<String, T> legacyEventConsumerFactory() {
- final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
@@ -112,7 +115,7 @@
*/
@Bean
public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
- final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
return new DefaultKafkaProducerFactory<>(producerConfigProperties);
}
@@ -124,7 +127,7 @@
*/
@Bean
public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
- final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
@@ -136,7 +139,8 @@
*/
@Bean
public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
- final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate =
+ new KafkaTemplate<>(cloudEventProducerFactory());
kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
return kafkaTemplate;
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
index ea72fd2..82ae546 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumer.java
@@ -23,16 +23,21 @@
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
import io.cloudevents.CloudEvent;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
+@RequiredArgsConstructor
public class CmNotificationSubscriptionDmiOutEventConsumer {
+ private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
+
/**
* Consume the Cm Notification Subscription event from the dmi-plugin.
*
@@ -56,7 +61,23 @@
final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent) {
final String subscriptionId = correlationId.split("#")[0];
final String dmiPluginName = correlationId.split("#")[1];
+
+ if ("ACCEPTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
+ dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
+ }
+
+ if ("REJECTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
+ handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
+ }
+
log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
}
+
+ private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
+ final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
+ dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
+ dmiPluginName, cmNotificationSubscriptionStatus);
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
index 2f10b1c..8c1cac3 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandler.java
@@ -30,8 +30,10 @@
import java.util.Map;
import java.util.Set;
import lombok.RequiredArgsConstructor;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionPredicate;
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService;
import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.impl.operations.DatastoreType;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
@@ -42,6 +44,7 @@
@RequiredArgsConstructor
public class DmiCmNotificationSubscriptionCacheHandler {
+ private final CmNotificationSubscriptionPersistenceService cmNotificationSubscriptionPersistenceService;
private final Map<String, Map<String, DmiCmNotificationSubscriptionDetails>> cmNotificationSubscriptionCache;
private final InventoryPersistence inventoryPersistence;
@@ -83,6 +86,46 @@
return dmiCmNotificationSubscriptionDetailsPerDmi;
}
+ /**
+ * Update status in map of subscription details per DMI.
+ *
+ * @param subscriptionId String of subscription Id
+ * @param dmiServiceName String of dmiServiceName
+ * @param status String of status
+ *
+ */
+ public void updateDmiCmNotificationSubscriptionStatusPerDmi(
+ final String subscriptionId, final String dmiServiceName, final CmNotificationSubscriptionStatus status) {
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .setCmNotificationSubscriptionStatus(status);
+ }
+
+ /**
+ * Persist map of subscription details per DMI.
+ *
+ * @param subscriptionId String of subscription Id
+ * @param dmiServiceName String of dmiServiceName
+ *
+ */
+ public void persistIntoDatabasePerDmi(final String subscriptionId, final String dmiServiceName) {
+ final List<DmiCmNotificationSubscriptionPredicate> dmiCmNotificationSubscriptionPredicateList =
+ cmNotificationSubscriptionCache.get(subscriptionId).get(dmiServiceName)
+ .getDmiCmNotificationSubscriptionPredicates();
+ for (final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate:
+ dmiCmNotificationSubscriptionPredicateList) {
+ final DatastoreType datastoreType = dmiCmNotificationSubscriptionPredicate.getDatastoreType();
+ final Set<String> cmHandles = dmiCmNotificationSubscriptionPredicate.getTargetCmHandleIds();
+ final Set<String> xpaths = dmiCmNotificationSubscriptionPredicate.getXpaths();
+
+ for (final String cmHandle: cmHandles) {
+ for (final String xpath: xpaths) {
+ cmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(datastoreType,
+ cmHandle, xpath, subscriptionId);
+ }
+ }
+ }
+ }
+
private void updateDmiCmNotificationSubscriptionDetailsPerDmi(
final String dmiServiceName,
final DmiCmNotificationSubscriptionPredicate dmiCmNotificationSubscriptionPredicate,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java
new file mode 100644
index 0000000..dc8037b
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobMetadata.java
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+/**
+ * Metadata of read/write data job request.
+ *
+ * @param destination The destination of the data job results.
+ * @param dataAcceptType Define the data response accept type.
+ * e.g. "application/vnd.3gpp.object-tree-hierarchical+json",
+ * "application/vnd.3gpp.object-tree-flat+json" etc.
+ * @param dataContentType Define the data request content type.
+ * e.g. "application/3gpp-json-patch+json" etc.
+ */
+public record DataJobMetadata(String destination, String dataAcceptType, String dataContentType) {}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java
new file mode 100644
index 0000000..f861c3d
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobReadRequest.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Describes the read data job operation to be forwarded to dmi.
+ *
+ * @param data List of read operations to be executed.
+ */
+public record DataJobReadRequest(List<ReadOperation> data) {}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java
new file mode 100644
index 0000000..254e198
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/DataJobWriteRequest.java
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Describes the write data job operation to be forwarded to dmi.
+ *
+ * @param data List of write operations to be executed.
+ */
+public record DataJobWriteRequest(List<WriteOperation> data) {}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java
new file mode 100644
index 0000000..d2b0738
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/ReadOperation.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+import java.util.List;
+
+/**
+ * Holds information of read data job operation.
+ * based on <a href="https://www.etsi.org/deliver/etsi_ts/128500_128599/128532/16.04.00_60/ts_128532v160400p.pdf">ETSI TS 128 532 V16.4.0 (2020-08)</a>
+ *
+ * @param path Identifier of a managed object (MO) on a network element. Defines the resource on which operation
+ * is executed. Url Encoded Fully Distinguished Name (FDN).
+ * @param op Describes the operation to execute. The value can only be "read".
+ * @param operationId Unique identifier of the operation within the request.
+ * @param attributes Specifies the attributes of the resources that are returned.
+ * @param fields Specifies the attribute fields of the resources that are returned. This should be used if an
+ * attribute is a struct and only a subset of its fields should be returned.
+ * @param filter This filters the managed Objects.
+ * @param scopeType This selects MOs depending on relationships with Base Managed Object.
+ * e.g. "BASE_ONLY", "BASE_ALL", "BASE_NTH_LEVEL" etc.
+ * @param scopeLevel Defines the level for objects to be returned for certain scopeTypes. The base level is zero.
+ */
+public record ReadOperation(String path, String op, String operationId, List<String> attributes, List<String> fields,
+ String filter, String scopeType, int scopeLevel) {
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java
new file mode 100644
index 0000000..c2f6504
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/models/datajob/WriteOperation.java
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.models.datajob;
+
+/**
+ * Holds information of write data job operation.
+ * based on <a href="https://www.etsi.org/deliver/etsi_ts/128500_128599/128532/16.04.00_60/ts_128532v160400p.pdf">ETSI TS 128 532 V16.4.0 (2020-08)</a>
+ *
+ * @param path Identifier of a managed object (MO) on a network element. Defines the resource on which operation
+ * is executed. Typically, is Fully Distinguished Name (FDN).
+ * @param op Describes the operation to execute. The value can be as below:
+ * e.g. "add", "replace", "remove", "action" etc.
+ * @param operationId Unique identifier of the operation within the request.
+ * @param value The value to be written depends on the type of operation.
+ */
+public record WriteOperation(String path, String op, String operationId, Object value) {}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy
new file mode 100644
index 0000000..4378764
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/DataJobServiceImplSpec.groovy
@@ -0,0 +1,79 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import org.slf4j.LoggerFactory
+import org.onap.cps.ncmp.api.models.datajob.DataJobReadRequest
+import org.onap.cps.ncmp.api.models.datajob.DataJobWriteRequest
+import org.onap.cps.ncmp.api.models.datajob.DataJobMetadata
+import org.onap.cps.ncmp.api.models.datajob.ReadOperation
+import org.onap.cps.ncmp.api.models.datajob.WriteOperation
+import spock.lang.Specification
+
+class DataJobServiceImplSpec extends Specification{
+
+ def objectUnderTest = new DataJobServiceImpl()
+
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
+ def setup() {
+ setupLogger()
+ }
+
+ def cleanup() {
+ ((Logger) LoggerFactory.getLogger(DataJobServiceImpl.class)).detachAndStopAllAppenders()
+ }
+
+ def '#operation data job request.'() {
+ given: 'data job metadata'
+ def dataJobMetadata = new DataJobMetadata('client-topic', 'application/vnd.3gpp.object-tree-hierarchical+json', 'application/3gpp-json-patch+json')
+ when: 'read/write data job request is processed'
+ if (operation == 'read') {
+ objectUnderTest.readDataJob('some-job-id', dataJobMetadata, new DataJobReadRequest([getWriteOrReadOperationRequest(operation)]))
+ } else {
+ objectUnderTest.writeDataJob('some-job-id', dataJobMetadata, new DataJobWriteRequest([getWriteOrReadOperationRequest(operation)]))
+ }
+ then: 'the data job id is correctly logged'
+ def loggingEvent = logger.list[0]
+ assert loggingEvent.level == Level.INFO
+ assert loggingEvent.formattedMessage.contains('data job id for ' + operation + ' operation is: some-job-id')
+ where: 'the following data job operations are used'
+ operation << ['read', 'write']
+ }
+
+ def getWriteOrReadOperationRequest(operation) {
+ if (operation == 'write') {
+ return new WriteOperation('some/write/path', 'add', 'some-operation-id', 'some-value')
+ }
+ return new ReadOperation('some/read/path', 'read', 'some-operation-id', ['some-attrib-1'], ['some-field-1'], 'some-filter', 'some-scope-type', 1)
+ }
+
+ def setupLogger() {
+ def setupLogger = ((Logger) LoggerFactory.getLogger(DataJobServiceImpl.class))
+ setupLogger.setLevel(Level.DEBUG)
+ setupLogger.addAppender(logger)
+ logger.start()
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
index d47be6c..4d0af6f 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImplSpec.groovy
@@ -23,9 +23,6 @@
package org.onap.cps.ncmp.api.impl
-import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse
-import org.onap.cps.ncmp.api.models.CmResourceAddress
-
import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME
import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DATASPACE_NAME
import static org.onap.cps.ncmp.api.impl.ncmppersistence.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
@@ -35,6 +32,8 @@
import static org.onap.cps.ncmp.api.impl.operations.OperationType.CREATE
import static org.onap.cps.ncmp.api.impl.operations.OperationType.UPDATE
+import org.onap.cps.ncmp.api.models.DmiPluginRegistrationResponse
+import org.onap.cps.ncmp.api.models.CmResourceAddress
import org.onap.cps.ncmp.api.impl.utils.AlternateIdChecker
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.api.NetworkCmProxyCmHandleQueryService
@@ -57,7 +56,6 @@
import org.onap.cps.ncmp.api.models.DataOperationRequest
import org.onap.cps.spi.exceptions.CpsException
import org.onap.cps.spi.model.ConditionProperties
-
import java.util.stream.Collectors
import org.onap.cps.utils.JsonObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
index d5b0915..16f27d0 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/kafka/KafkaConfigSpec.groovy
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
index 4f0132e..523ec76 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/CmNotificationSubscriptionDmiOutEventConsumerSpec.groovy
@@ -28,26 +28,32 @@
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent
+import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
+import org.spockframework.spring.SpringBean
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
class CmNotificationSubscriptionDmiOutEventConsumerSpec extends MessagingBaseSpec {
- def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer()
- def logger = Spy(ListAppender<ILoggingEvent>)
-
@Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
ObjectMapper objectMapper
+ @SpringBean
+ DmiCmNotificationSubscriptionCacheHandler mockDmiCmNotificationSubscriptionCacheHandler = Mock(DmiCmNotificationSubscriptionCacheHandler)
+
+ def objectUnderTest = new CmNotificationSubscriptionDmiOutEventConsumer(mockDmiCmNotificationSubscriptionCacheHandler)
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
void setup() {
((Logger) LoggerFactory.getLogger(CmNotificationSubscriptionDmiOutEventConsumer.class)).addAppender(logger)
logger.start()
@@ -78,6 +84,29 @@
assert loggingEvent.formattedMessage == 'Cm Subscription with id : sub-1 handled by the dmi-plugin : test-dmi-plugin-name has the status : accepted'
}
+ def 'Consume a valid CM Notification Subscription Event and perform correct actions base on status'() {
+ given: 'a cmNotificationSubscription event'
+ def dmiOutEventData = new Data(statusCode: statusCode, statusMessage: subscriptionStatus.toString())
+ def cmNotificationSubscriptionDmiOutEvent = new CmNotificationSubscriptionDmiOutEvent().withData(dmiOutEventData)
+ def testCloudEventSent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(cmNotificationSubscriptionDmiOutEvent))
+ .withId('random-uuid')
+ .withType('subscriptionCreateResponse')
+ .withSource(URI.create('test-dmi-plugin-name'))
+ .withExtension('correlationid', 'sub-1#test-dmi-plugin-name').build()
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+ when: 'the event is consumed'
+ objectUnderTest.consumeCmNotificationSubscriptionDmiOutEvent(consumerRecord)
+ then: 'correct number of calls to cache'
+ expectedCacheCalls * mockDmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi('sub-1','test-dmi-plugin-name', subscriptionStatus)
+ and: 'correct number of calls to persist cache'
+ expectedPersistenceCalls * mockDmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
+ where: 'the following parameters are used'
+ scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
+ 'Accepted Status' | CmNotificationSubscriptionStatus.ACCEPTED | '1' || 1 | 1
+ 'Rejected Status' | CmNotificationSubscriptionStatus.REJECTED | '2' || 1 | 0
+ }
+
def getLoggingEvent() {
return logger.list[0]
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
index 132c4bc..47a1c89 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/cmsubscription/DmiCmNotificationSubscriptionCacheHandlerSpec.groovy
@@ -24,6 +24,8 @@
import io.cloudevents.CloudEvent
import io.cloudevents.core.builder.CloudEventBuilder
import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus
+import org.onap.cps.ncmp.api.impl.events.cmsubscription.service.CmNotificationSubscriptionPersistenceService
import org.onap.cps.ncmp.api.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
@@ -45,9 +47,11 @@
ObjectMapper objectMapper
@SpringBean
InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence)
+ @SpringBean
+ CmNotificationSubscriptionPersistenceService mockCmNotificationSubscriptionPersistenceService = Mock(CmNotificationSubscriptionPersistenceService)
def testCache = [:]
- def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(testCache, mockInventoryPersistence)
+ def objectUnderTest = new DmiCmNotificationSubscriptionCacheHandler(mockCmNotificationSubscriptionPersistenceService, testCache, mockInventoryPersistence)
CmNotificationSubscriptionNcmpInEvent cmNotificationSubscriptionNcmpInEvent
def yangModelCmHandle1 = new YangModelCmHandle(id:'ch1',dmiServiceName:'dmi-1')
@@ -62,9 +66,9 @@
def 'Load CM subscription event to cache'() {
given: 'a valid subscription event with Id'
- def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId();
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
and: 'list of predicates'
- def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates();
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
when: 'a valid event object loaded in cache'
objectUnderTest.add(subscriptionId, predicates)
then: 'the cache contains the correct entry with #subscriptionId subscription ID'
@@ -115,6 +119,29 @@
assert mapOfCMHandleIDsByDmi.get('dmi-2') == ['ch2'].toSet()
}
+ def 'Update subscription status in cache per DMI service name'() {
+ given: 'populated cache'
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ objectUnderTest.add(subscriptionId, predicates)
+ when: 'subscription status per dmi is updated in cache'
+ objectUnderTest.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,'dmi-1', CmNotificationSubscriptionStatus.ACCEPTED)
+ then: 'verify status has been updated in cache'
+ def predicate = testCache.get(subscriptionId)
+ assert predicate.get('dmi-1').cmNotificationSubscriptionStatus == CmNotificationSubscriptionStatus.ACCEPTED
+ }
+
+ def 'Persist Cache into database per dmi'() {
+ given: 'populate cache'
+ def predicates = cmNotificationSubscriptionNcmpInEvent.getData().getPredicates()
+ def subscriptionId = cmNotificationSubscriptionNcmpInEvent.getData().getSubscriptionId()
+ objectUnderTest.add(subscriptionId, predicates)
+ when: 'subscription is persisted in database'
+ objectUnderTest.persistIntoDatabasePerDmi(subscriptionId,'dmi-1')
+ then: 'persistence service is called the correct number of times per dmi'
+ 4 * mockCmNotificationSubscriptionPersistenceService.addOrUpdateCmNotificationSubscription(_,_,_,subscriptionId)
+ }
+
def setUpTestEvent(){
def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
def testEventSent = jsonObjectMapper.convertJsonString(jsonData, CmNotificationSubscriptionNcmpInEvent.class)
diff --git a/docs/cps-ncmp-message-status-codes.rst b/docs/cps-ncmp-message-status-codes.rst
index 90590a2..018cf4a 100644
--- a/docs/cps-ncmp-message-status-codes.rst
+++ b/docs/cps-ncmp-message-status-codes.rst
@@ -40,6 +40,8 @@
+-----------------+------------------------------------------------------+-----------------------------------+
| 111 | alternate id already associated | Inventory |
+-----------------+------------------------------------------------------+-----------------------------------+
+ | 112 | message too large | Data Operation |
+ +-----------------+------------------------------------------------------+-----------------------------------+
.. note::