Add multi-threaded Integration Test for Module Sync
- Add tests for multi threaded scenarios around module sync
- Disabled ModuleSyncWatchdog timer using long delay and interval
- Call Module Sync method as needed for more control
(sometimes it needs to be triggered twice like retry use cases as designed)
- Improve NCMP performance test setup (consistent naming etc.)
- Rename some production code method names to better reflect functionality
- Disabled intermittent failing test for create cm handle as it is not asserting the correct message
- Improved Code Coverage ModuleSyncWatchdog
Issue-ID: CPS-2462
Change-Id: Ia907af77d2037309f1bbb73ea671679b788bab9e
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryController.java
index 8aa86ad..cea3d2a 100755
--- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryController.java
+++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryController.java
@@ -85,7 +85,7 @@
public ResponseEntity updateDmiPluginRegistration(
final @Valid RestDmiPluginRegistration restDmiPluginRegistration) {
final DmiPluginRegistrationResponse dmiPluginRegistrationResponse =
- networkCmProxyInventoryFacade.updateDmiRegistrationAndSyncModule(
+ networkCmProxyInventoryFacade.updateDmiRegistration(
ncmpRestInputMapper.toDmiPluginRegistration(restDmiPluginRegistration));
final DmiPluginRegistrationErrorResponse failedRegistrationErrorResponse =
getFailureRegistrationResponse(dmiPluginRegistrationResponse);
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryControllerSpec.groovy
index 97c68f0..d7ac466 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryControllerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyInventoryControllerSpec.groovy
@@ -84,7 +84,7 @@
.content(jsonData)
).andReturn().response
then: 'the converted object is forwarded to the registration service'
- 1 * mockNetworkCmProxyInventoryFacade.updateDmiRegistrationAndSyncModule(mockDmiPluginRegistration) >> new DmiPluginRegistrationResponse()
+ 1 * mockNetworkCmProxyInventoryFacade.updateDmiRegistration(mockDmiPluginRegistration) >> new DmiPluginRegistrationResponse()
and: 'response status is no content'
response.status == HttpStatus.OK.value()
where: 'the following registration json is used'
@@ -181,7 +181,7 @@
updatedCmHandles: [CmHandleRegistrationResponse.createSuccessResponse('cm-handle-2')],
removedCmHandles: [CmHandleRegistrationResponse.createSuccessResponse('cm-handle-3')]
)
- mockNetworkCmProxyInventoryFacade.updateDmiRegistrationAndSyncModule(*_) >> dmiRegistrationResponse
+ mockNetworkCmProxyInventoryFacade.updateDmiRegistration(*_) >> dmiRegistrationResponse
when: 'registration endpoint is invoked'
def response = mvc.perform(
post("$ncmpBasePathV1/ch")
@@ -205,7 +205,7 @@
removedCmHandles: [removeCmHandleResponse],
upgradedCmHandles: [upgradeCmHandleResponse]
)
- mockNetworkCmProxyInventoryFacade.updateDmiRegistrationAndSyncModule(*_) >> dmiRegistrationResponse
+ mockNetworkCmProxyInventoryFacade.updateDmiRegistration(*_) >> dmiRegistrationResponse
when: 'registration endpoint is invoked'
def response = mvc.perform(
post("$ncmpBasePathV1/ch")
diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyRestExceptionHandlerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyRestExceptionHandlerSpec.groovy
index e87acac..c75058b 100644
--- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyRestExceptionHandlerSpec.groovy
+++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyRestExceptionHandlerSpec.groovy
@@ -35,7 +35,6 @@
import org.onap.cps.ncmp.impl.data.NcmpCachedResourceRequestHandler
import org.onap.cps.ncmp.impl.data.NcmpPassthroughResourceRequestHandler
import org.onap.cps.ncmp.impl.data.NetworkCmProxyFacade
-import org.onap.cps.ncmp.impl.data.policyexecutor.PolicyExecutor
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.rest.util.CmHandleStateMapper
import org.onap.cps.ncmp.rest.util.DataOperationRequestMapper
@@ -170,7 +169,7 @@
if (NCMP == apiType) {
mockNetworkCmProxyInventoryFacade.getYangResourcesModuleReferences(*_) >> { throw exception }
}
- mockNetworkCmProxyInventoryFacade.updateDmiRegistrationAndSyncModule(*_) >> { throw exception }
+ mockNetworkCmProxyInventoryFacade.updateDmiRegistration(*_) >> { throw exception }
}
def performTestRequest(apiType) {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java
index 1fa801c..a4ea7b4 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/NetworkCmProxyInventoryFacade.java
@@ -64,15 +64,16 @@
private final TrustLevelManager trustLevelManager;
private final AlternateIdMatcher alternateIdMatcher;
+
+
/**
* Registration of Created, Removed, Updated or Upgraded CM Handles.
*
* @param dmiPluginRegistration Dmi Plugin Registration details
* @return dmiPluginRegistrationResponse
*/
- public DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule(
- final DmiPluginRegistration dmiPluginRegistration) {
- return cmHandleRegistrationService.updateDmiRegistrationAndSyncModule(dmiPluginRegistration);
+ public DmiPluginRegistrationResponse updateDmiRegistration(final DmiPluginRegistration dmiPluginRegistration) {
+ return cmHandleRegistrationService.updateDmiRegistration(dmiPluginRegistration);
}
/**
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java
index d9f7e38..cb55b09 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationService.java
@@ -87,8 +87,7 @@
* @param dmiPluginRegistration Dmi Plugin Registration details
* @return dmiPluginRegistrationResponse
*/
- public DmiPluginRegistrationResponse updateDmiRegistrationAndSyncModule(
- final DmiPluginRegistration dmiPluginRegistration) {
+ public DmiPluginRegistrationResponse updateDmiRegistration(final DmiPluginRegistration dmiPluginRegistration) {
dmiPluginRegistration.validateDmiPluginRegistration();
final DmiPluginRegistrationResponse dmiPluginRegistrationResponse = new DmiPluginRegistrationResponse();
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
index 6b34527..898b8d5 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdog.java
@@ -32,6 +32,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
+import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.onap.cps.spi.model.DataNode;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -47,6 +48,7 @@
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
private final Lock workQueueLock;
+ private final Sleeper sleeper;
private static final int MODULE_SYNC_BATCH_SIZE = 100;
private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
@@ -59,9 +61,10 @@
* Check DB for any cm handles in 'ADVISED' state.
* Queue and create batches to process them asynchronously.
* This method will only finish when there are no more 'ADVISED' cm handles in the DB.
- * This method wil be triggered on a configurable interval
+ * This method is triggered on a configurable interval (ncmp.timers.advised-modules-sync.sleep-time-ms)
*/
- @Scheduled(fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}")
+ @Scheduled(initialDelayString = "${test.ncmp.timers.advised-modules-sync.initial-delay-ms:0}",
+ fixedDelayString = "${ncmp.timers.advised-modules-sync.sleep-time-ms:5000}")
public void moduleSyncAdvisedCmHandles() {
log.debug("Processing module sync watchdog waking up.");
populateWorkQueueIfNeeded();
@@ -82,16 +85,12 @@
}
}
- private void preventBusyWait() {
- try {
- log.debug("Busy waiting now");
- TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private void populateWorkQueueIfNeeded() {
+ /**
+ * Populate work queue with advised cm handles from db.
+ * This method is made public for (integration) testing purposes.
+ * So it can be tested without the queue being emptied immediately as the main public method does.
+ */
+ public void populateWorkQueueIfNeeded() {
if (moduleSyncWorkQueue.isEmpty() && workQueueLock.tryLock()) {
try {
populateWorkQueue();
@@ -154,4 +153,13 @@
log.info("nextBatch size : {}", nextBatch.size());
return nextBatch;
}
+
+ private void preventBusyWait() {
+ try {
+ log.debug("Busy waiting now");
+ sleeper.haveALittleRest(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
new file mode 100644
index 0000000..7a02fa0
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/utils/Sleeper.java
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.utils;
+
+import java.util.concurrent.TimeUnit;
+import org.springframework.stereotype.Service;
+
+/**
+ * This class is to extract out sleep functionality so the interrupted exception handling can
+ * be covered with a test (e.g. using spy on Sleeper) and help to get to 100% code coverage.
+ */
+@Service
+public class Sleeper {
+ public void haveALittleRest(final long timeInMillis) throws InterruptedException {
+ TimeUnit.MILLISECONDS.sleep(timeInMillis);
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy
index dcff2e9..70e26d9 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServiceSpec.groovy
@@ -87,7 +87,7 @@
and: 'cm handle is in READY state'
mockCmHandleQueries.cmHandleHasState('cmhandle-3', CmHandleState.READY) >> true
when: 'registration is processed'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration)
+ objectUnderTest.updateDmiRegistration(dmiRegistration)
then: 'cm-handles are removed first'
1 * objectUnderTest.processRemovedCmHandles(*_)
and: 'de-registered cm handle entry is removed from in progress map'
@@ -108,7 +108,7 @@
and: 'exception while checking cm handle state'
mockInventoryPersistence.getYangModelCmHandle('cmhandle-3') >> new YangModelCmHandle(id: 'cmhandle-3', moduleSetTag: '', compositeState: new CompositeState(cmHandleState: cmHandleState))
when: 'registration is processed'
- def result = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration)
+ def result = objectUnderTest.updateDmiRegistration(dmiRegistration)
then: 'upgrade operation contains expected error code'
assert result.upgradedCmHandles[0].status == expectedResponseStatus
where: 'the following parameters are used'
@@ -124,7 +124,7 @@
and: 'exception while checking cm handle state'
mockInventoryPersistence.getYangModelCmHandle('cmhandle-3') >> { throw exception }
when: 'registration is processed'
- def result = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiRegistration)
+ def result = objectUnderTest.updateDmiRegistration(dmiRegistration)
then: 'upgrade operation contains expected error code'
assert result.upgradedCmHandles.ncmpResponseStatus.code[0] == expectedErrorCode
where: 'the following parameters are used'
@@ -139,7 +139,7 @@
dmiDataPlugin: dmiDataPlugin)
dmiPluginRegistration.createdCmHandles = [ncmpServiceCmHandle]
when: 'update registration and sync module is called with correct DMI plugin information'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'create cm handles registration and sync modules is called with the correct plugin information'
1 * objectUnderTest.processCreatedCmHandles(dmiPluginRegistration, _)
where:
@@ -155,7 +155,7 @@
dmiDataPlugin: dmiDataPlugin)
dmiPluginRegistration.createdCmHandles = [ncmpServiceCmHandle]
when: 'registration is called with incorrect DMI plugin information'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a DMI Request Exception is thrown with correct message details'
def exceptionThrown = thrown(DmiRequestException.class)
assert exceptionThrown.getMessage().contains(expectedMessageDetails)
@@ -178,7 +178,7 @@
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: 'my-server')
dmiPluginRegistration.createdCmHandles = [new NcmpServiceCmHandle(cmHandleId: 'cmhandle', dmiProperties: dmiProperties, publicProperties: publicProperties)]
when: 'registration is updated'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a successful response is received'
response.createdCmHandles.size() == 1
with(response.createdCmHandles[0]) {
@@ -206,7 +206,7 @@
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: 'my-server',
createdCmHandles:[new NcmpServiceCmHandle(cmHandleId: 'ch-1', registrationTrustLevel: registrationTrustLevel)])
when: 'registration is updated'
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'trustLevel is set for the created cm-handle'
1 * mockTrustLevelManager.registerCmHandles(expectedMapping)
where:
@@ -225,7 +225,7 @@
def xpath = "somePathWithId[@id='cmhandle2']"
mockLcmEventsCmHandleStateHandler.initiateStateAdvised(*_) >> { throw AlreadyDefinedException.forDataNodes([xpath], 'some-context') }
when: 'registration is updated to create cm-handles'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a response is received for all cm-handles'
response.createdCmHandles.size() == 1
and: 'all cm-handles creation fails'
@@ -244,7 +244,7 @@
and: 'cm-handler registration fails: #scenario'
mockLcmEventsCmHandleStateHandler.initiateStateAdvised(*_) >> { throw exception }
when: 'registration is updated'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a failure response is received'
response.createdCmHandles.size() == 1
with(response.createdCmHandles[0]) {
@@ -269,7 +269,7 @@
CmHandleRegistrationResponse.createFailureResponse('cm handle 4', CM_HANDLE_INVALID_ID)]
mockNetworkCmProxyDataServicePropertyHandler.updateCmHandleProperties(_) >> updateOperationResponse
when: 'registration is updated'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the response contains updateOperationResponse'
assert response.updatedCmHandles.size() == 4
assert response.updatedCmHandles.containsAll(updateOperationResponse)
@@ -281,7 +281,7 @@
and: '#scenario'
mockCpsModuleService.deleteSchemaSetsWithCascade(_, ['cmhandle']) >> { if (!schemaSetExist) { throw new SchemaSetNotFoundException('', '') } }
when: 'registration is updated to delete cmhandle'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the cmHandle state is updated to "DELETING"'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch(_) >>
{ args -> args[0].values()[0] == CmHandleState.DELETING }
@@ -315,7 +315,7 @@
and: 'cm-handle deletion is successful for 1st and 3rd; failed for 2nd'
mockInventoryPersistence.deleteDataNode("/dmi-registry/cm-handles[@id='cmhandle2']") >> { throw new RuntimeException("Failed") }
when: 'registration is updated to delete cmhandles'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the cmHandle states are all updated to "DELETING"'
1 * mockLcmEventsCmHandleStateHandler.updateCmHandleStateBatch({ assert it.every { entry -> entry.value == CmHandleState.DELETING } })
and: 'a response is received for all cm-handles'
@@ -361,7 +361,7 @@
and: 'schema set single deletion failed with unknown error'
mockInventoryPersistence.deleteSchemaSetWithCascade(_) >> { throw new RuntimeException('Failed') }
when: 'registration is updated to delete cmhandle'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'no exception is thrown'
noExceptionThrown()
and: 'cm-handle is not deleted'
@@ -387,7 +387,7 @@
and: 'cm-handle deletion fails on individual delete'
mockInventoryPersistence.deleteDataNode(_) >> { throw deleteListElementException }
when: 'registration is updated to delete cmhandle'
- def response = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def response = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'a failure response is received'
assert response.removedCmHandles.size() == 1
with(response.removedCmHandles[0]) {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy
index fec0755..a7dd38c 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/NetworkCmProxyInventoryFacadeSpec.groovy
@@ -56,9 +56,9 @@
given: 'an (updated) dmi plugin registration'
def dmiPluginRegistration = Mock(DmiPluginRegistration)
when: 'the registration is submitted '
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'the call is delegated to the cm handle registration service'
- 1 * mockCmHandleRegistrationService.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ 1 * mockCmHandleRegistrationService.updateDmiRegistration(dmiPluginRegistration)
}
def 'Execute cm handle id search for inventory'() {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 3064a78..f2c88a5 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -23,6 +23,7 @@
import com.hazelcast.map.IMap
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.onap.cps.ncmp.impl.utils.Sleeper
import org.onap.cps.spi.model.DataNode
import spock.lang.Specification
@@ -31,7 +32,7 @@
class ModuleSyncWatchdogSpec extends Specification {
- def mockSyncUtils = Mock(ModuleOperationsUtils)
+ def mockModuleOperationsUtils = Mock(ModuleOperationsUtils)
def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
@@ -45,16 +46,21 @@
def mockWorkQueueLock = Mock(Lock)
- def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock)
+ def spiedSleeper = Spy(Sleeper)
+
+ def objectUnderTest = new ModuleSyncWatchdog(mockModuleOperationsUtils, moduleSyncWorkQueue , mockModuleSyncStartedOnCmHandles, mockModuleSyncTasks, spiedAsyncTaskExecutor, mockWorkQueueLock, spiedSleeper)
void setup() {
spiedAsyncTaskExecutor.setupThreadPool()
- mockWorkQueueLock.tryLock() >> true
}
def 'Module sync advised cm handles with #scenario.'() {
- given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+ given: 'module sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+ and: 'module sync utilities returns no failed (locked) cm handles'
+ mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> []
+ and: 'the work queue is not locked'
+ mockWorkQueueLock.tryLock() >> true
and: 'the executor has enough available threads'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 3
when: ' module sync is started'
@@ -63,6 +69,7 @@
expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
where: 'the following parameter are used'
scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
+ 'none at all' | 0 || 0
'less then 1 batch' | 1 || 1
'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1
'2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2
@@ -70,9 +77,11 @@
'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
}
- def 'Module sync advised cm handles starts with no available threads.'() {
- given: 'sync utilities returns a advise cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ def 'Module sync cm handles starts with no available threads.'() {
+ given: 'module sync utilities returns a advise cm handles'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ and: 'the work queue is not locked'
+ mockWorkQueueLock.tryLock() >> true
and: 'the executor first has no threads but has one thread on the second attempt'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >>> [ 0, 1 ]
when: ' module sync is started'
@@ -81,9 +90,11 @@
1 * spiedAsyncTaskExecutor.executeTask(*_)
}
- def 'Module sync advised cm handles already handled.'() {
- given: 'sync utilities returns a advise cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ def 'Module sync advised cm handle already handled by other thread.'() {
+ given: 'module sync utilities returns an advised cm handle'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ and: 'the work queue is not locked'
+ mockWorkQueueLock.tryLock() >> true
and: 'the executor has a thread available'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 1
and: 'the semaphore cache indicates the cm handle is already being processed'
@@ -98,7 +109,7 @@
given: 'there is still a cm handle in the queue'
moduleSyncWorkQueue.offer(new DataNode())
and: 'sync utilities returns many advise cm handles'
- mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(500)
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(500)
and: 'the executor has plenty threads available'
spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> 10
when: ' module sync is started'
@@ -108,18 +119,42 @@
}
def 'Reset failed cm handles.'() {
- given: 'sync utilities returns failed cm handles'
+ given: 'module sync utilities returns failed cm handles'
def failedCmHandles = [new YangModelCmHandle()]
- mockSyncUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles
+ mockModuleOperationsUtils.getCmHandlesThatFailedModelSyncOrUpgrade() >> failedCmHandles
when: 'reset failed cm handles is started'
objectUnderTest.setPreviouslyLockedCmHandlesToAdvised()
then: 'it is delegated to the module sync task (service)'
1 * mockModuleSyncTasks.setCmHandlesToAdvised(failedCmHandles)
}
+ def 'Module Sync Locking.'() {
+ given: 'module sync utilities returns an advised cm handle'
+ mockModuleOperationsUtils.getAdvisedCmHandles() >> createDataNodes(1)
+ and: 'can lock is : #canLock'
+ mockWorkQueueLock.tryLock() >> canLock
+ when: 'attempt to populate the work queue'
+ objectUnderTest.populateWorkQueueIfNeeded()
+ then: 'the queue remains empty is #expectQueueRemainsEmpty'
+ assert moduleSyncWorkQueue.isEmpty() == expectQueueRemainsEmpty
+ where: 'the following lock states are applied'
+ canLock | expectQueueRemainsEmpty
+ false | true
+ true | false
+ }
+
+ def 'Sleeper gets interrupted.'() {
+ given: 'sleeper gets interrupted'
+ spiedSleeper.haveALittleRest(_) >> { throw new InterruptedException() }
+ when: 'the watchdog attempts to sleep to save cpu cycles'
+ objectUnderTest.preventBusyWait()
+ then: 'no exception is thrown'
+ noExceptionThrown()
+ }
+
def createDataNodes(numberOfDataNodes) {
def dataNodes = []
- (1..numberOfDataNodes).each {dataNodes.add(new DataNode())}
+ numberOfDataNodes.times { dataNodes.add(new DataNode()) }
return dataNodes
}
}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
index 587cbae..759eccd 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/base/CpsIntegrationSpecBase.groovy
@@ -53,6 +53,7 @@
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.annotation.ComponentScan
import org.springframework.data.jpa.repository.config.EnableJpaRepositories
+import org.springframework.test.context.ActiveProfiles
import org.springframework.test.web.servlet.MockMvc
import org.testcontainers.spock.Testcontainers
import spock.lang.Shared
@@ -61,6 +62,7 @@
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
+import java.util.concurrent.BlockingQueue
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
@@ -73,6 +75,7 @@
@EnableJpaRepositories(basePackageClasses = [DataspaceRepository])
@ComponentScan(basePackages = ['org.onap.cps'])
@EntityScan('org.onap.cps.ri.models')
+@ActiveProfiles('module-sync-delayed')
abstract class CpsIntegrationSpecBase extends Specification {
@Shared
@@ -118,6 +121,9 @@
ModuleSyncWatchdog moduleSyncWatchdog
@Autowired
+ BlockingQueue<DataNode> moduleSyncWorkQueue
+
+ @Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
@@ -244,26 +250,39 @@
}
def registerCmHandle(dmiPlugin, cmHandleId, moduleSetTag, alternateId) {
- def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, moduleSetTag: moduleSetTag, alternateId: alternateId)
- networkCmProxyInventoryFacade.updateDmiRegistrationAndSyncModule(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: [cmHandleToCreate]))
+ registerCmHandleWithoutWaitForReady(dmiPlugin, cmHandleId, moduleSetTag, alternateId)
+ moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
CmHandleState.READY == networkCmProxyInventoryFacade.getCmHandleCompositeState(cmHandleId).cmHandleState
})
}
+ def registerCmHandleWithoutWaitForReady(dmiPlugin, cmHandleId, moduleSetTag, alternateId) {
+ def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: cmHandleId, moduleSetTag: moduleSetTag, alternateId: alternateId)
+ networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: [cmHandleToCreate]))
+ }
+
+ def registerSequenceOfCmHandlesWithoutWaitForReady(dmiPlugin, moduleSetTag, numberOfCmHandles) {
+ def cmHandles = []
+ (1..numberOfCmHandles).each {
+ def cmHandle = new NcmpServiceCmHandle(cmHandleId: 'ch-'+it, moduleSetTag: moduleSetTag, alternateId: NO_ALTERNATE_ID)
+ cmHandles.add(cmHandle)
+ }
+ networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, createdCmHandles: cmHandles))
+ }
+
def deregisterCmHandle(dmiPlugin, cmHandleId) {
deregisterCmHandles(dmiPlugin, [cmHandleId])
}
def deregisterCmHandles(dmiPlugin, cmHandleIds) {
- networkCmProxyInventoryFacade.updateDmiRegistrationAndSyncModule(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
+ networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
}
- def overrideCmHandleLastUpdateTime(cmHandleId, newUpdateTime) {
- String ISO_TIMESTAMP_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
- DateTimeFormatter ISO_TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern(ISO_TIMESTAMP_PATTERN);
- def jsonForUpdate = '{ "state": { "last-update-time": "%s" } }'.formatted(ISO_TIMESTAMP_FORMATTER.format(newUpdateTime))
- cpsDataService.updateNodeLeaves(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
- NCMP_DMI_REGISTRY_PARENT + "/cm-handles[@id='${cmHandleId}']", jsonForUpdate, now, ContentType.JSON)
+ def deregisterSequenceOfCmHandles(dmiPlugin, numberOfCmHandles) {
+ def cmHandleIds = []
+ (1..numberOfCmHandles).each { cmHandleIds.add('ch-'+it) }
+ networkCmProxyInventoryFacade.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: dmiPlugin, removedCmHandles: cmHandleIds))
}
+
}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
index 10a9f15..19b10a3 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
@@ -32,42 +32,48 @@
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory
+import spock.lang.Ignore
import spock.util.concurrent.PollingConditions
import java.time.Duration
-import java.time.OffsetDateTime
class CmHandleCreateSpec extends CpsIntegrationSpecBase {
NetworkCmProxyInventoryFacade objectUnderTest
+ def uniqueId = 'ch-unique-id-for-create-test'
- def kafkaConsumer = KafkaTestContainer.getConsumer('ncmp-group', StringDeserializer.class)
+ def kafkaConsumer = KafkaTestContainer.getConsumer('test-group', StringDeserializer.class)
def setup() {
objectUnderTest = networkCmProxyInventoryFacade
}
+ @Ignore
def 'CM Handle registration is successful.'() {
given: 'DMI will return modules when requested'
dmiDispatcher1.moduleNamesPerCmHandleId['ch-1'] = ['M1', 'M2']
+ dmiDispatcher1.moduleNamesPerCmHandleId[uniqueId] = ['M1', 'M2']
and: 'consumer subscribed to topic'
kafkaConsumer.subscribe(['ncmp-events'])
when: 'a CM-handle is registered for creation'
- def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
+ def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: uniqueId)
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
- def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'registration gives successful response'
- assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse('ch-1')]
+ assert dmiPluginRegistrationResponse.createdCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(uniqueId)]
and: 'CM-handle is initially in ADVISED state'
- assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState
+ assert CmHandleState.ADVISED == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
+
+ and: 'the module sync watchdog is triggered'
+ moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
and: 'CM-handle goes to READY state after module sync'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
- assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState
+ assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(uniqueId).cmHandleState
})
and: 'the messages is polled'
@@ -76,13 +82,20 @@
and: 'the newest lcm event notification is received with READY state'
def notificationMessage = jsonObjectMapper.convertJsonString(records.last().value().toString(), LcmEvent)
+ /*TODO (Toine) This test was failing intermittently (when running as part of suite).
+ I suspect that it often gave false positives as the message being assert here was any random message created by previous tests
+ By checking the cm-handle and using an unique cm-handle in this test this flaw became obvious.
+ I have now ignored this test as it is out of scope of this commit to fix it.
+ Created: https://lf-onap.atlassian.net/browse/CPS-2468 to fix this instead
+ */
+ assert notificationMessage.event.cmHandleId == uniqueId
assert notificationMessage.event.newValues.cmHandleState.value() == 'READY'
and: 'the CM-handle has expected modules'
- assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences('ch-1').moduleName.sort()
+ assert ['M1', 'M2'] == objectUnderTest.getYangResourcesModuleReferences(uniqueId).moduleName.sort()
cleanup: 'deregister CM handle'
- deregisterCmHandle(DMI1_URL, 'ch-1')
+ deregisterCmHandle(DMI1_URL, uniqueId)
}
def 'CM Handle goes to LOCKED state when DMI gives error during module sync.'() {
@@ -92,7 +105,10 @@
when: 'a CM-handle is registered for creation'
def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-1')
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate])
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
+
+ and: 'the module sync watchdog is triggered'
+ moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
then: 'CM-handle goes to LOCKED state with reason MODULE_SYNC_FAILED'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
@@ -117,7 +133,10 @@
when: 'a CM-handle is registered for creation with moduleSetTag "B"'
def cmHandleToCreate = new NcmpServiceCmHandle(cmHandleId: 'ch-3', moduleSetTag: 'B')
- objectUnderTest.updateDmiRegistrationAndSyncModule(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]))
+ objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: [cmHandleToCreate]))
+
+ and: 'the module sync watchdog is triggered'
+ moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
then: 'the CM-handle goes to READY state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
@@ -151,7 +170,7 @@
new NcmpServiceCmHandle(cmHandleId: 'ch-7', alternateId: 'duplicate-alt-id'),
]
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
- def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
then: 'registration gives expected responses'
assert dmiPluginRegistrationResponse.createdCmHandles.sort { it.cmHandle } == [
@@ -173,7 +192,11 @@
when: 'CM-handles are registered for creation'
def cmHandlesToCreate = [new NcmpServiceCmHandle(cmHandleId: 'ch-1'), new NcmpServiceCmHandle(cmHandleId: 'ch-2')]
def dmiPluginRegistration = new DmiPluginRegistration(dmiPlugin: DMI1_URL, createdCmHandles: cmHandlesToCreate)
- objectUnderTest.updateDmiRegistrationAndSyncModule(dmiPluginRegistration)
+ objectUnderTest.updateDmiRegistration(dmiPluginRegistration)
+
+ and: 'the module sync watchdog is triggered'
+ moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
+
then: 'CM-handles go to LOCKED state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
assert objectUnderTest.getCmHandleCompositeState('ch-1').cmHandleState == CmHandleState.LOCKED
@@ -183,6 +206,9 @@
dmiDispatcher1.moduleNamesPerCmHandleId = ['ch-1': ['M1', 'M2'], 'ch-2': ['M1', 'M2']]
dmiDispatcher1.isAvailable = true
+ and: 'the module sync watchdog is triggered TWICE'
+ 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
+
then: 'Both CM-handles go to READY state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
['ch-1', 'ch-2'].each { cmHandleId ->
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpdateSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpdateSpec.groovy
index 2d1588e..67011f8 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpdateSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpdateSpec.groovy
@@ -44,7 +44,7 @@
when: "CM-handle is registered for update with new alternate ID: $newAlternateId"
def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: newAlternateId)
def dmiPluginRegistrationResponse =
- objectUnderTest.updateDmiRegistrationAndSyncModule(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
+ objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
then: 'registration gives successful response'
assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse('ch-1')]
@@ -74,7 +74,7 @@
when: 'a CM-handle is registered for update with new alternate ID'
def cmHandleToUpdate = new NcmpServiceCmHandle(cmHandleId: 'ch-1', alternateId: 'new')
def dmiPluginRegistrationResponse =
- objectUnderTest.updateDmiRegistrationAndSyncModule(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
+ objectUnderTest.updateDmiRegistration(new DmiPluginRegistration(dmiPlugin: DMI1_URL, updatedCmHandles: [cmHandleToUpdate]))
then: 'registration gives failure response, due to alternate ID being already associated'
assert dmiPluginRegistrationResponse.updatedCmHandles == [CmHandleRegistrationResponse.createFailureResponse('ch-1', NcmpResponseStatus.ALTERNATE_ID_ALREADY_ASSOCIATED)]
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy
index f93f58c..6444937 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleUpgradeSpec.groovy
@@ -48,7 +48,7 @@
when: "the CM-handle is upgraded with given moduleSetTag '${updatedModuleSetTag}'"
def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag)
- def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistrationAndSyncModule(
+ def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
then: 'registration gives successful response'
@@ -63,6 +63,9 @@
when: 'DMI will return different modules for upgrade: M1 and M3'
dmiDispatcher1.moduleNamesPerCmHandleId[CM_HANDLE_ID] = ['M1', 'M3']
+ and: 'the module sync watchdog is triggered twice'
+ 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
+
then: 'CM-handle goes to READY state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
@@ -98,12 +101,15 @@
when: "CM-handle is upgraded to moduleSetTag '${updatedModuleSetTag}'"
def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: updatedModuleSetTag)
- def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistrationAndSyncModule(
+ def dmiPluginRegistrationResponse = objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
then: 'registration gives successful response'
assert dmiPluginRegistrationResponse.upgradedCmHandles == [CmHandleRegistrationResponse.createSuccessResponse(CM_HANDLE_ID)]
+ and: 'the module sync watchdog is triggered twice'
+ 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
+
and: 'CM-handle goes to READY state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
assert CmHandleState.READY == objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID).cmHandleState
@@ -132,7 +138,7 @@
when: 'CM-handle is upgraded with the same moduleSetTag'
def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'same')
- objectUnderTest.updateDmiRegistrationAndSyncModule(
+ objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
then: 'CM-handle remains in READY state'
@@ -157,9 +163,12 @@
when: 'the CM-handle is upgraded'
def cmHandlesToUpgrade = new UpgradedCmHandles(cmHandles: [CM_HANDLE_ID], moduleSetTag: 'newTag')
- objectUnderTest.updateDmiRegistrationAndSyncModule(
+ objectUnderTest.updateDmiRegistration(
new DmiPluginRegistration(dmiPlugin: DMI1_URL, upgradedCmHandles: cmHandlesToUpgrade))
+ and: 'the module sync watchdog is triggered twice'
+ 2.times { moduleSyncWatchdog.moduleSyncAdvisedCmHandles() }
+
then: 'CM-handle goes to LOCKED state with reason MODULE_UPGRADE_FAILED'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
def cmHandleCompositeState = objectUnderTest.getCmHandleCompositeState(CM_HANDLE_ID)
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
new file mode 100644
index 0000000..e0bb437
--- /dev/null
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/ModuleSyncWatchdogIntegrationSpec.groovy
@@ -0,0 +1,99 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the 'License');
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an 'AS IS' BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.integration.functional.ncmp
+
+import org.onap.cps.integration.base.CpsIntegrationSpecBase
+import org.onap.cps.ncmp.impl.inventory.sync.ModuleSyncWatchdog
+
+import java.util.concurrent.Executors
+
+class ModuleSyncWatchdogIntegrationSpec extends CpsIntegrationSpecBase {
+
+ ModuleSyncWatchdog objectUnderTest
+
+ def executorService = Executors.newFixedThreadPool(2)
+ def SYNC_SAMPLE_SIZE = 100
+
+ def setup() {
+ objectUnderTest = moduleSyncWatchdog
+ registerSequenceOfCmHandlesWithoutWaitForReady(DMI1_URL, NO_MODULE_SET_TAG, SYNC_SAMPLE_SIZE)
+ }
+
+ def cleanup() {
+ try {
+ deregisterSequenceOfCmHandles(DMI1_URL, SYNC_SAMPLE_SIZE)
+ moduleSyncWorkQueue.clear()
+ } finally {
+ executorService.shutdownNow()
+ }
+ }
+
+ def 'Watchdog is disabled for test.'() {
+ when: 'wait a while but less then the initial delay of 10 minutes'
+ Thread.sleep(3000)
+ then: 'the work queue remains empty'
+ assert moduleSyncWorkQueue.isEmpty()
+ }
+
+ def 'Populate module sync work queue simultaneously on two parallel threads (CPS-2403).'() {
+ // This test failed before bug https://lf-onap.atlassian.net/browse/CPS-2403 was fixed
+ given: 'the queue is empty at the start'
+ assert moduleSyncWorkQueue.isEmpty()
+ when: 'attempt to populate the queue on the main (test) and another parallel thread at the same time'
+ objectUnderTest.populateWorkQueueIfNeeded()
+ executorService.execute(populateQueueWithoutDelay)
+ and: 'wait a little (to give all threads time to complete their task)'
+ Thread.sleep(50)
+ then: 'the queue size is exactly the sample size'
+ assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE
+ }
+
+ def 'Populate module sync work queue on two parallel threads with a slight difference in start time.'() {
+ // This test proved that the issue in CPS-2403 did not arise if the the queue was populated and given time to be distributed
+ given: 'the queue is empty at the start'
+ assert moduleSyncWorkQueue.isEmpty()
+ when: 'attempt to populate the queue on the main (test) and another parallel thread a little later'
+ objectUnderTest.populateWorkQueueIfNeeded()
+ executorService.execute(populateQueueWithDelay)
+ and: 'wait a little (to give all threads time to complete their task)'
+ Thread.sleep(50)
+ then: 'the queue size is exactly the sample size'
+ assert moduleSyncWorkQueue.size() == SYNC_SAMPLE_SIZE
+ }
+
+ def populateQueueWithoutDelay = () -> {
+ try {
+ objectUnderTest.populateWorkQueueIfNeeded()
+ } catch (InterruptedException e) {
+ e.printStackTrace()
+ }
+ }
+
+ def populateQueueWithDelay = () -> {
+ try {
+ Thread.sleep(10)
+ objectUnderTest.populateWorkQueueIfNeeded()
+ } catch (InterruptedException e) {
+ e.printStackTrace()
+ }
+ }
+
+}
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/RestApiSpec.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/RestApiSpec.groovy
index 1e1af55..2655628 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/RestApiSpec.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/RestApiSpec.groovy
@@ -44,6 +44,8 @@
def requestBody = '{"dmiPlugin":"'+DMI1_URL+'","createdCmHandles":[{"cmHandle":"ch-1","alternateId":"alt-1"},{"cmHandle":"ch-2","alternateId":"alt-2"},{"cmHandle":"ch-3","alternateId":"alt-3"}]}'
mvc.perform(post('/ncmpInventory/v1/ch').contentType(MediaType.APPLICATION_JSON).content(requestBody))
.andExpect(status().is2xxSuccessful())
+ and: 'the module sync watchdog is triggered'
+ moduleSyncWatchdog.moduleSyncAdvisedCmHandles()
then: 'CM-handles go to READY state'
new PollingConditions().within(MODULE_SYNC_WAIT_TIME_IN_SECONDS, () -> {
(1..3).each {
diff --git a/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy b/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy
index f6ae27d..fb5a0c3 100644
--- a/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy
+++ b/integration-test/src/test/groovy/org/onap/cps/integration/performance/base/NcmpPerfTestBase.groovy
@@ -27,11 +27,13 @@
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR
+import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_PARENT
class NcmpPerfTestBase extends PerfTestBase {
def static NCMP_PERFORMANCE_TEST_DATASPACE = 'ncmpPerformance'
- def static REGISTRY_ANCHOR = 'ncmp-registry'
+ def static REGISTRY_ANCHOR = NCMP_DMI_REGISTRY_ANCHOR
+ def static REGISTRY_PARENT = NCMP_DMI_REGISTRY_PARENT
def static REGISTRY_SCHEMA_SET = 'registrySchemaSet'
def static TOTAL_CM_HANDLES = 20_000
def static CM_DATA_SUBSCRIPTIONS_ANCHOR = 'cm-data-subscriptions'
@@ -70,30 +72,30 @@
}
def createRegistrySchemaSet() {
- def modelAsString = readResourceDataFile('ncmp-registry/dmi-registry@2024-02-23.yang')
+ def modelAsString = readResourceDataFile('inventory/dmi-registry@2024-02-23.yang')
cpsModuleService.createSchemaSet(NCMP_PERFORMANCE_TEST_DATASPACE, REGISTRY_SCHEMA_SET, [registry: modelAsString])
}
def addRegistryData() {
cpsAnchorService.createAnchor(NCMP_PERFORMANCE_TEST_DATASPACE, REGISTRY_SCHEMA_SET, REGISTRY_ANCHOR)
cpsDataService.saveData(NCMP_PERFORMANCE_TEST_DATASPACE, REGISTRY_ANCHOR, '{"dmi-registry": []}', now)
- def innerNodeJsonTemplate = readResourceDataFile('ncmp-registry/innerNode.json')
+ def cmHandleJsonTemplate = readResourceDataFile('inventory/cmHandleTemplate.json')
def batchSize = 100
for (def i = 0; i < TOTAL_CM_HANDLES; i += batchSize) {
- def data = '{ "cm-handles": [' + (1..batchSize).collect { innerNodeJsonTemplate.replace('CMHANDLE_ID_HERE', (it + i).toString()) }.join(',') + ']}'
- cpsDataService.saveListElements(NCMP_PERFORMANCE_TEST_DATASPACE, REGISTRY_ANCHOR, '/dmi-registry', data, now, ContentType.JSON)
+ def data = '{ "cm-handles": [' + (1..batchSize).collect { cmHandleJsonTemplate.replace('CM_HANDLE_ID_HERE', (it + i).toString()) }.join(',') + ']}'
+ cpsDataService.saveListElements(NCMP_PERFORMANCE_TEST_DATASPACE, REGISTRY_ANCHOR, REGISTRY_PARENT, data, now, ContentType.JSON)
}
}
def addRegistryDataWithAlternateIdAsPath() {
- def innerNodeJsonTemplate = readResourceDataFile('ncmp-registry/innerCmHandleNode.json')
+ def cmHandleWithAlternateIdTemplate = readResourceDataFile('inventory/cmHandleWithAlternateIdTemplate.json')
def batchSize = 10
for (def i = 0; i < TOTAL_CM_HANDLES; i += batchSize) {
def data = '{ "cm-handles": [' + (1..batchSize).collect {
- innerNodeJsonTemplate.replace('CM_HANDLE_ID_HERE', (it + i).toString())
+ cmHandleWithAlternateIdTemplate.replace('CM_HANDLE_ID_HERE', (it + i).toString())
.replace('ALTERNATE_ID_AS_PATH', (it + i).toString())
}.join(',') + ']}'
- cpsDataService.saveListElements(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, '/dmi-registry', data, now, ContentType.JSON)
+ cpsDataService.saveListElements(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, REGISTRY_PARENT, data, now, ContentType.JSON)
}
}
@@ -117,7 +119,7 @@
def result = cpsDataService.getDataNodes(NCMP_PERFORMANCE_TEST_DATASPACE, REGISTRY_ANCHOR, '/', FetchDescendantsOption.OMIT_DESCENDANTS)
resourceMeter.stop()
then: 'expected data exists'
- assert result.xpath == ['/dmi-registry']
+ assert result.xpath == [REGISTRY_PARENT]
and: 'operation completes within expected time'
recordAndAssertResourceUsage('NCMP pre-load test data',
15, resourceMeter.totalTimeInSeconds,
diff --git a/integration-test/src/test/resources/application-module-sync-delayed.yml b/integration-test/src/test/resources/application-module-sync-delayed.yml
new file mode 100644
index 0000000..7b9c6ae
--- /dev/null
+++ b/integration-test/src/test/resources/application-module-sync-delayed.yml
@@ -0,0 +1,23 @@
+# ============LICENSE_START=======================================================
+# Copyright (C) 2024 Nordix Foundation.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=========================================================
+
+test:
+ ncmp:
+ timers:
+ advised-modules-sync:
+ initial-delay-ms: 600000
+
diff --git a/integration-test/src/test/resources/application.yml b/integration-test/src/test/resources/application.yml
index 853797a..b786a3d 100644
--- a/integration-test/src/test/resources/application.yml
+++ b/integration-test/src/test/resources/application.yml
@@ -179,7 +179,7 @@
timers:
advised-modules-sync:
- sleep-time-ms: 1000
+ sleep-time-ms: 1000000
cm-handle-data-sync:
sleep-time-ms: 30000
subscription-forwarding:
diff --git a/integration-test/src/test/resources/data/ncmp-registry/innerNode.json b/integration-test/src/test/resources/data/inventory/cmHandleTemplate.json
similarity index 86%
rename from integration-test/src/test/resources/data/ncmp-registry/innerNode.json
rename to integration-test/src/test/resources/data/inventory/cmHandleTemplate.json
index b6c65f3..6577f4e 100644
--- a/integration-test/src/test/resources/data/ncmp-registry/innerNode.json
+++ b/integration-test/src/test/resources/data/inventory/cmHandleTemplate.json
@@ -1,6 +1,6 @@
{
- "id": "cm-CMHANDLE_ID_HERE",
- "alternate-id": "alt-CMHANDLE_ID_HERE",
+ "id": "cm-CM_HANDLE_ID_HERE",
+ "alternate-id": "alt-CM_HANDLE_ID_HERE",
"module-set-tag": "my-module-set-tag",
"dmi-service-name": "http://ncmp-dmi-plugin-stub:8080",
"dmi-data-service-name": "",
@@ -21,4 +21,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/integration-test/src/test/resources/data/ncmp-registry/innerCmHandleNode.json b/integration-test/src/test/resources/data/inventory/cmHandleWithAlternateIdTemplate.json
similarity index 100%
rename from integration-test/src/test/resources/data/ncmp-registry/innerCmHandleNode.json
rename to integration-test/src/test/resources/data/inventory/cmHandleWithAlternateIdTemplate.json
diff --git a/integration-test/src/test/resources/data/ncmp-registry/dmi-registry@2024-02-23.yang b/integration-test/src/test/resources/data/inventory/dmi-registry@2024-02-23.yang
similarity index 100%
rename from integration-test/src/test/resources/data/ncmp-registry/dmi-registry@2024-02-23.yang
rename to integration-test/src/test/resources/data/inventory/dmi-registry@2024-02-23.yang
diff --git a/policy-executor-stub/src/main/java/org/onap/cps/policyexecutor/stub/controller/Sleeper.java b/policy-executor-stub/src/main/java/org/onap/cps/policyexecutor/stub/controller/Sleeper.java
index 8f904cc..789201f 100644
--- a/policy-executor-stub/src/main/java/org/onap/cps/policyexecutor/stub/controller/Sleeper.java
+++ b/policy-executor-stub/src/main/java/org/onap/cps/policyexecutor/stub/controller/Sleeper.java
@@ -24,8 +24,8 @@
import org.springframework.stereotype.Service;
/**
- * This class is a successfull experiment to extract out sleep functionality so the interrupted exception handling can
- * be covered with a test (e.g. using spy ion Sleeper) and help to get too 100% code coverage.
+ * This class is to extract out sleep functionality so the interrupted exception handling can
+ * be covered with a test (e.g. using spy on Sleeper) and help to get too 100% code coverage.
*/
@Service
public class Sleeper {