Performance Improvement: Use hazelcast blocking queue
- Introducing hazelcast for queue and progress map
- process batch of 100 at the time
- decreased module sync watchdog sleeptime to 5 seconds
- separate module sync tasks in new class and some other async preparations and easier testing
- tests for batching in module sync watchdog
- remove qualifiers annotation (support) where no longer needed
Issue-ID: CPS-1210
Issue-ID: CPS-1126
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Change-Id: I0a7d3755bf774e27c5688741bddb01f427d4a8a7
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 1755134..f7fa735 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -167,7 +167,7 @@
timers:
advised-modules-sync:
- sleep-time-ms: 30000
+ sleep-time-ms: 5000
locked-modules-sync:
sleep-time-ms: 300000
cm-handle-data-sync:
diff --git a/cps-ncmp-service/lombok.config b/cps-ncmp-service/lombok.config
index b60a192..1fba85b 100644
--- a/cps-ncmp-service/lombok.config
+++ b/cps-ncmp-service/lombok.config
@@ -18,4 +18,3 @@
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
-lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
new file mode 100644
index 0000000..abde4c2
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.embeddedcache;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.NamedConfig;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.onap.cps.spi.model.DataNode;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Core infrastructure of the hazelcast distributed caches for Module Sync and Data Sync use cases.
+ */
+@Configuration
+public class SynchronizationCacheConfig {
+
+ private static final QueueConfig commonQueueConfig = createQueueConfig();
+ private static final MapConfig moduleSyncStartedConfig =
+ createMapConfig("moduleSyncStartedConfig", TimeUnit.MINUTES.toSeconds(1));
+ private static final MapConfig dataSyncSemaphoresConfig =
+ createMapConfig("dataSyncSemaphoresConfig", TimeUnit.MINUTES.toSeconds(30));
+
+ /**
+ * Module Sync Distributed Queue Instance.
+ *
+ * @return queue of cm handles (data nodes) that need module sync
+ */
+ @Bean
+ public BlockingQueue<DataNode> moduleSyncWorkQueue() {
+ return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig)
+ .getQueue("moduleSyncWorkQueue");
+ }
+
+ /**
+ * Module Sync started (and maybe finished) on cm handles (ids).
+ *
+ * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed
+ */
+ @Bean
+ public Map<String, Object> moduleSyncStartedOnCmHandles() {
+ return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig)
+ .getMap("moduleSyncStartedOnCmHandles");
+ }
+
+ /**
+ * Data Sync Distributed Map Instance.
+ *
+ * @return configured map of data sync semaphores
+ */
+ @Bean
+ public Map<String, Boolean> dataSyncSemaphores() {
+ return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig)
+ .getMap("dataSyncSemaphores");
+ }
+
+ private HazelcastInstance createHazelcastInstance(
+ final String hazelcastInstanceName, final NamedConfig namedConfig) {
+ return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig));
+ }
+
+ private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) {
+ final Config config = new Config(instanceName);
+ if (namedConfig instanceof MapConfig) {
+ config.addMapConfig((MapConfig) namedConfig);
+ }
+ if (namedConfig instanceof QueueConfig) {
+ config.addQueueConfig((QueueConfig) namedConfig);
+ }
+ config.setClusterName("synchronization-caches");
+ return config;
+ }
+
+ private static QueueConfig createQueueConfig() {
+ final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig");
+ commonQueueConfig.setBackupCount(3);
+ commonQueueConfig.setAsyncBackupCount(3);
+ return commonQueueConfig;
+ }
+
+ private static MapConfig createMapConfig(final String configName, final long timeToLiveSeconds) {
+ final MapConfig mapConfig = new MapConfig(configName);
+ mapConfig.setBackupCount(3);
+ mapConfig.setAsyncBackupCount(3);
+ mapConfig.setTimeToLiveSeconds((int) timeToLiveSeconds);
+ return mapConfig;
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
deleted file mode 100644
index 571558a..0000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * ===========LICENSE_START========================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.config.embeddedcache;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * Core infrastructure of the hazelcast distributed map for Module Sync and Data Sync use cases.
- */
-@Configuration
-public class SynchronizationSemaphoresConfig {
-
- private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30);
-
- /**
- * Module Sync Distributed Map Instance.
- *
- * @return configured map of module sync semaphores
- */
- @Bean
- public ConcurrentMap<String, Boolean> moduleSyncSemaphores() {
- return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig")
- .getMap("moduleSyncSemaphores");
- }
-
- /**
- * Data Sync Distributed Map Instance.
- *
- * @return configured map of data sync semaphores
- */
- @Bean
- public ConcurrentMap<String, Boolean> dataSyncSemaphores() {
- return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig")
- .getMap("dataSyncSemaphores");
- }
-
- private HazelcastInstance createHazelcastInstance(
- final String hazelcastInstanceName, final String configMapName) {
- return Hazelcast.newHazelcastInstance(
- initializeDefaultMapConfig(hazelcastInstanceName, configMapName));
- }
-
- private Config initializeDefaultMapConfig(final String instanceName, final String configName) {
- final Config config = new Config(instanceName);
- final MapConfig mapConfig = new MapConfig(configName);
- mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS);
- mapConfig.setBackupCount(3);
- mapConfig.setAsyncBackupCount(3);
- config.addMapConfig(mapConfig);
- config.setClusterName("synchronization-semaphores");
- return config;
- }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
index 45ba078..107f8a0 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
@@ -21,7 +21,7 @@
package org.onap.cps.ncmp.api.inventory.sync;
import java.time.OffsetDateTime;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -29,7 +29,6 @@
import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -47,8 +46,7 @@
private final SyncUtils syncUtils;
- @Qualifier("dataSyncSemaphores")
- private final ConcurrentMap<String, Boolean> dataSyncSemaphores;
+ private final Map<String, Boolean> dataSyncSemaphores;
/**
* Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
index c574aa6..7f61c47 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
@@ -86,19 +86,17 @@
}
/**
- * Deletes the SchemaSet for provided cmHandle if the SchemaSet Exists.
+ * Deletes the SchemaSet for schema set id if the SchemaSet Exists.
*
- * @param yangModelCmHandle the yang model of cm handle.
+ * @param schemaSetId the schema set id to be deleted
*/
- public void deleteSchemaSetIfExists(final YangModelCmHandle yangModelCmHandle) {
- final String schemaSetAndAnchorName = yangModelCmHandle.getId();
+ public void deleteSchemaSetIfExists(final String schemaSetId) {
try {
- cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetAndAnchorName,
+ cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetId,
CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED);
- log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetAndAnchorName);
+ log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetId);
} catch (final SchemaSetNotFoundException e) {
- log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.",
- schemaSetAndAnchorName);
+ log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", schemaSetId);
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
new file mode 100644
index 0000000..5e26650
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
+import org.onap.cps.ncmp.api.impl.utils.YangDataConverter;
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.inventory.CmHandleState;
+import org.onap.cps.ncmp.api.inventory.CompositeState;
+import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
+import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
+import org.onap.cps.spi.model.DataNode;
+import org.springframework.stereotype.Component;
+
+@RequiredArgsConstructor
+@Component
+@Slf4j
+public class ModuleSyncTasks {
+ private final InventoryPersistence inventoryPersistence;
+ private final SyncUtils syncUtils;
+ private final ModuleSyncService moduleSyncService;
+ private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
+
+ private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);
+
+ /**
+ * Perform module sync on a batch of cm handles.
+ *
+ * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+ * @return completed future to handle post-processing
+ */
+ public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) {
+ final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
+ for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
+ final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
+ final YangModelCmHandle yangModelCmHandle =
+ YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
+ final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+ try {
+ moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
+ moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
+ cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
+ } catch (final Exception e) {
+ syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+ LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+ setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+ cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+ }
+ log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+ }
+ updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
+ return COMPLETED_FUTURE;
+ }
+
+ /**
+ * Reset state to "ADVISED" for any previously failed cm handles.
+ *
+ * @param failedCmHandles previously failed (locked) cm handles
+ * @return completed future to handle post-processing
+ */
+ public CompletableFuture<Void> resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) {
+ final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size());
+ for (final YangModelCmHandle failedCmHandle : failedCmHandles) {
+ final CompositeState compositeState = failedCmHandle.getCompositeState();
+ final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
+ if (isReadyForRetry) {
+ log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog",
+ failedCmHandle.getId());
+ cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED);
+ }
+ }
+ updateCmHandlesStateBatch(cmHandleStatePerCmHandle);
+ return COMPLETED_FUTURE;
+ }
+
+ private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
+ final CompositeState.LockReason lockReason) {
+ advisedCmHandle.getCompositeState().setLockReason(lockReason);
+ }
+
+ private void updateCmHandlesStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) {
+ // To be refactored as part of CPS-1231; Use state-save-batch capability (depends sub-task12, 13)
+ for (final Map.Entry<YangModelCmHandle, CmHandleState> entry : cmHandleStatePerCmHandle.entrySet()) {
+ lcmEventsCmHandleStateHandler.updateCmHandleState(entry.getKey(), entry.getValue());
+ }
+ }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index be811a1..8074fe6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -21,17 +21,16 @@
package org.onap.cps.ncmp.api.inventory.sync;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
-import org.onap.cps.ncmp.api.inventory.CmHandleState;
-import org.onap.cps.ncmp.api.inventory.CompositeState;
-import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
-import org.springframework.beans.factory.annotation.Qualifier;
+import org.onap.cps.spi.model.DataNode;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -40,75 +39,75 @@
@Component
public class ModuleSyncWatchdog {
- private static final boolean MODEL_SYNC_IN_PROGRESS = false;
- private static final boolean MODEL_SYNC_DONE = true;
-
- private final InventoryPersistence inventoryPersistence;
-
private final SyncUtils syncUtils;
+ private final BlockingQueue<DataNode> moduleSyncWorkQueue;
+ private final Map<String, Object> moduleSyncStartedOnCmHandles;
+ private final ModuleSyncTasks moduleSyncTasks;
- private final ModuleSyncService moduleSyncService;
-
- @Qualifier("moduleSyncSemaphores")
- private final ConcurrentMap<String, Boolean> moduleSyncSemaphores;
-
- private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
+ private static final int MODULE_SYNC_BATCH_SIZE = 100;
+ private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
+ private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
/**
* Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
*/
- @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
- public void executeAdvisedCmHandlePoll() {
- syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
- final String cmHandleId = advisedCmHandle.getId();
- if (hasPushedIntoSemaphoreMap(cmHandleId)) {
- log.debug("executing module sync on {}", cmHandleId);
- final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
- try {
- moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
- moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
- lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.READY);
- updateModuleSyncSemaphoreMap(cmHandleId);
- } catch (final Exception e) {
- syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
- LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
- setCmHandleStateLocked(advisedCmHandle, compositeState.getLockReason());
- }
- log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
- } else {
- log.debug("{} already processed by another instance", cmHandleId);
- }
- });
- log.debug("No Cm-Handles currently found in an ADVISED state");
+ @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}")
+ public void moduleSyncAdvisedCmHandles() {
+ populateWorkQueueIfNeeded();
+ while (!moduleSyncWorkQueue.isEmpty()) {
+ final Collection<DataNode> nextBatch = prepareNextBatch();
+ moduleSyncTasks.performModuleSync(nextBatch);
+ preventBusyWait();
+ }
}
/**
- * Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'.
+ * Find any failed (locked) cm handles and change state back to 'ADVISED'.
*/
@Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
- public void executeLockedCmHandlePoll() {
- final List<YangModelCmHandle> lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
- for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) {
- final CompositeState compositeState = lockedCmHandle.getCompositeState();
- final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
- if (isReadyForRetry) {
- log.debug("Reset cm handle {} state to ADVISED to re-attempt module-sync", lockedCmHandle.getId());
- lcmEventsCmHandleStateHandler.updateCmHandleState(lockedCmHandle, CmHandleState.ADVISED);
+ public void resetPreviouslyFailedCmHandles() {
+ final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
+ moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
+ }
+
+ private void preventBusyWait() {
+ // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution
+ // but leaving here to minimize impacts on this class for that Jira
+ try {
+ TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void populateWorkQueueIfNeeded() {
+ if (moduleSyncWorkQueue.isEmpty()) {
+ final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles();
+ for (final DataNode advisedCmHandle : advisedCmHandles) {
+ if (!moduleSyncWorkQueue.offer(advisedCmHandle)) {
+ log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
+ }
}
}
}
- private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
- final CompositeState.LockReason lockReason) {
- advisedCmHandle.getCompositeState().setLockReason(lockReason);
- lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.LOCKED);
+ private Collection<DataNode> prepareNextBatch() {
+ final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+ final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+ moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE);
+ log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size());
+ for (final DataNode batchCandidate : nextBatchCandidates) {
+ final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
+ final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP
+ .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP));
+ if (alreadyAddedToInProgressMap) {
+ log.debug("module sync for {} already in progress by other instance", cmHandleId);
+ } else {
+ nextBatch.add(batchCandidate);
+ }
+ }
+ log.debug("nextBatch size : {}", nextBatch.size());
+ return nextBatch;
}
- private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
- moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE);
- }
-
- private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
- return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null;
- }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
index 16fb8f4..537f501 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
@@ -64,19 +64,14 @@
private static final Pattern retryAttemptPattern = Pattern.compile("^Attempt #(\\d+) failed:");
/**
- * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing.
+ * Query data nodes for cm handles with an "ADVISED" cm handle state.
*
- * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found
+ * @return cm handles (data nodes) in ADVISED state (empty list if none found)
*/
- public List<YangModelCmHandle> getAdvisedCmHandles() {
- final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>(
- cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED));
- log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size());
- if (advisedCmHandlesAsDataNodeList.isEmpty()) {
- return Collections.emptyList();
- }
- Collections.shuffle(advisedCmHandlesAsDataNodeList);
- return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList);
+ public List<DataNode> getAdvisedCmHandles() {
+ final List<DataNode> advisedCmHandlesAsDataNodes = cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED);
+ log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size());
+ return advisedCmHandlesAsDataNodes;
}
/**
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
new file mode 100644
index 0000000..80aa81b
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.cps.ncmp.api.impl.config.embeddedcache
+import com.hazelcast.core.Hazelcast
+import org.onap.cps.spi.model.DataNode
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+import java.util.concurrent.BlockingQueue
+
+@SpringBootTest
+@ContextConfiguration(classes = [SynchronizationCacheConfig])
+class SynchronizationCacheConfigSpec extends Specification {
+
+ @Autowired
+ private BlockingQueue<DataNode> moduleSyncWorkQueue
+
+ @Autowired
+ private Map<String, Object> moduleSyncStartedOnCmHandles
+
+ @Autowired
+ private Map<String, Boolean> dataSyncSemaphores
+
+ def 'Embedded (hazelcast) Caches for Module and Data Sync.'() {
+ expect: 'system is able to create an instance of the Module Sync Work Queue'
+ assert null != moduleSyncWorkQueue
+ and: 'system is able to create an instance of a map to hold cm handles which have started (and maybe finished) module sync'
+ assert null != moduleSyncStartedOnCmHandles
+ and: 'system is able to create an instance of a map to hold data sync semaphores'
+ assert null != dataSyncSemaphores
+ and: 'there 3 instances'
+ assert Hazelcast.allHazelcastInstances.size() == 3
+ and: 'they have the correct names (in any order)'
+ assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' )
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
deleted file mode 100644
index ea84b44..0000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * ============LICENSE_START========================================================
- * Copyright (C) 2022 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.config.embeddedcache
-
-import com.hazelcast.core.Hazelcast
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.test.context.ContextConfiguration
-import spock.lang.Specification
-
-@SpringBootTest
-@ContextConfiguration(classes = [SynchronizationSemaphoresConfig])
-class SynchronizationSemaphoresConfigSpec extends Specification {
-
- @Autowired
- private Map<String, Boolean> moduleSyncSemaphores;
-
- @Autowired
- private Map<String, Boolean> dataSyncSemaphores;
-
- def 'Embedded Sync Semaphores'() {
- expect: 'system is able to create an instance of ModuleSyncSemaphores'
- assert null != moduleSyncSemaphores
- and: 'system is able to create an instance of DataSyncSemaphores'
- assert null != dataSyncSemaphores
- and: 'we have 2 instances'
- assert Hazelcast.allHazelcastInstances.size() == 2
- and: 'the names match'
- assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphores', 'dataSyncSemaphores']
- }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy
index 6a2fbe8..78da7eb 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy
@@ -24,8 +24,6 @@
import org.onap.cps.api.CpsModuleService
import org.onap.cps.ncmp.api.impl.operations.DmiModelOperations
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-import org.onap.cps.ncmp.api.inventory.CmHandleState
-import org.onap.cps.ncmp.api.inventory.CompositeState
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
import org.onap.cps.spi.CascadeDeleteAllowed
import org.onap.cps.spi.exceptions.SchemaSetNotFoundException
@@ -34,7 +32,6 @@
class ModuleSyncServiceSpec extends Specification {
-
def mockCpsModuleService = Mock(CpsModuleService)
def mockDmiModelOperations = Mock(DmiModelOperations)
def mockCpsAdminService = Mock(CpsAdminService)
@@ -72,38 +69,27 @@
}
def 'Delete Schema Set for CmHandle' () {
- given: 'a CmHandle in the advised state'
- def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED))
- and: 'the Schema Set exists for the CmHandle'
- 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id',
- CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED)
when: 'delete schema set if exists is called'
- objectUnderTest.deleteSchemaSetIfExists(cmHandle)
- then: 'there are no exceptions'
- noExceptionThrown()
+ objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id')
+ then: 'the module service is invoked to delete the correct schema set'
+ 1 * mockCpsModuleService.deleteSchemaSet(expectedDataspaceName, 'some-cmhandle-id', CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED)
}
def 'Delete a non-existing Schema Set for CmHandle' () {
- given: 'a CmHandle in the advised state'
- def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED))
- and: 'the DB throws an exception because its Schema Set does not exist'
- 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id',
- CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') }
+ given: 'the DB throws an exception because its Schema Set does not exist'
+ mockCpsModuleService.deleteSchemaSet(*_) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') }
when: 'delete schema set if exists is called'
- objectUnderTest.deleteSchemaSetIfExists(cmHandle)
- then: 'there are no exceptions'
+ objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id')
+ then: 'the exception from the DB is ignored; there are no exceptions'
noExceptionThrown()
}
def 'Delete Schema Set for CmHandle with other exception' () {
- given: 'a CmHandle in the advised state'
- def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED))
- and: 'an exception other than SchemaSetNotFoundException is thrown'
+ given: 'an exception other than SchemaSetNotFoundException is thrown'
UnsupportedOperationException unsupportedOperationException = new UnsupportedOperationException();
- 1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id',
- CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw unsupportedOperationException }
+ 1 * mockCpsModuleService.deleteSchemaSet(*_) >> { throw unsupportedOperationException }
when: 'delete schema set if exists is called'
- objectUnderTest.deleteSchemaSetIfExists(cmHandle)
+ objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id')
then: 'an exception is thrown'
def result = thrown(UnsupportedOperationException)
result == unsupportedOperationException
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
new file mode 100644
index 0000000..291ba96
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * Modifications Copyright (C) 2022 Bell Canada
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync
+
+import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
+import org.onap.cps.ncmp.api.inventory.CmHandleState
+import org.onap.cps.ncmp.api.inventory.CompositeState
+import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
+import org.onap.cps.ncmp.api.inventory.InventoryPersistence
+import org.onap.cps.ncmp.api.inventory.LockReasonCategory
+import org.onap.cps.spi.model.DataNode
+import spock.lang.Specification
+
+class ModuleSyncTasksSpec extends Specification {
+
+ def mockInventoryPersistence = Mock(InventoryPersistence)
+
+ def mockSyncUtils = Mock(SyncUtils)
+
+ def mockModuleSyncService = Mock(ModuleSyncService)
+
+ def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler)
+
+ def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler)
+
+ def 'Module Sync ADVISED cm handles.'() {
+ given: 'cm handles in an ADVISED state'
+ def cmHandle1 = advisedCmHandleAsDataNode('cm-handle-1')
+ def cmHandle2 = advisedCmHandleAsDataNode('cm-handle-2')
+ and: 'the inventory persistence cm handle returns a ADVISED state for the any handle'
+ mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
+ when: 'module sync poll is executed'
+ objectUnderTest.performModuleSync([cmHandle1, cmHandle2])
+ then: 'module sync service deletes schemas set of each cm handle if it already exists'
+ 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1')
+ 1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2')
+ and: 'module sync service is invoked for each cm handle'
+ 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') }
+ 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') }
+ and: 'the state handler is called for the both cm handles'
+ 2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY)
+ }
+
+ def 'Module Sync ADVISED cm handle with failure during sync.'() {
+ given: 'a cm handle in an ADVISED state'
+ def cmHandle = advisedCmHandleAsDataNode('cm-handle')
+ and: 'the inventory persistence cm handle returns a ADVISED state for the cm handle'
+ def cmHandleState = new CompositeState(cmHandleState: CmHandleState.ADVISED)
+ 1 * mockInventoryPersistence.getCmHandleState('cm-handle') >> cmHandleState
+ and: 'module sync service attempts to sync the cm handle and throws an exception'
+ 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') }
+ when: 'module sync is executed'
+ objectUnderTest.performModuleSync([cmHandle])
+ then: 'update lock reason, details and attempts is invoked'
+ 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception')
+ and: 'the state handler is called to update the state to LOCKED'
+ 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED)
+ }
+
+ def 'Reset failed CM Handles #scenario.'() {
+ given: 'cm handles in an locked state'
+ def lockedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED)
+ .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build()
+ def yangModelCmHandle1 = new YangModelCmHandle(id: 'cm-handle-1', compositeState: lockedState)
+ def yangModelCmHandle2 = new YangModelCmHandle(id: 'cm-handle-2', compositeState: lockedState)
+ and: 'sync utils retry locked cm handle returns #isReadyForRetry'
+ mockSyncUtils.isReadyForRetry(lockedState) >>> isReadyForRetry
+ when: 'resetting failed cm handles'
+ objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2])
+ then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry'
+ expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED)
+ where:
+ scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState
+ 'retry locked cm handle once' | [true, false] || 1
+ 'retry locked cm handle twice' | [true, true] || 2
+ 'do not retry locked cm handle' | [false, false] || 0
+ }
+
+ def advisedCmHandleAsDataNode(cmHandleId) {
+ return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED'])
+ }
+
+ def assertYamgModelCmHandleArgument(args, expectedCmHandleId) {
+ {
+ def yangModelCmHandle = args[0]
+ assert yangModelCmHandle.id == expectedCmHandleId
+ }
+ return true
+ }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 81268cb..43f492d 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -21,98 +21,56 @@
package org.onap.cps.ncmp.api.inventory.sync
-
-import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-import org.onap.cps.ncmp.api.inventory.CmHandleState
-import org.onap.cps.ncmp.api.inventory.CompositeState
-import org.onap.cps.ncmp.api.inventory.InventoryPersistence
-import org.onap.cps.ncmp.api.inventory.LockReasonCategory
-import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
-import spock.lang.Specification
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import org.onap.cps.spi.model.DataNode
+import spock.lang.Specification
class ModuleSyncWatchdogSpec extends Specification {
- def mockInventoryPersistence = Mock(InventoryPersistence)
-
def mockSyncUtils = Mock(SyncUtils)
- def mockModuleSyncService = Mock(ModuleSyncService)
+ def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
- def stubbedMap = Stub(ConcurrentMap)
+ BlockingQueue<DataNode> moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
- def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler)
+ def moduleSyncStartedOnCmHandles = [:]
- def cmHandleState = CmHandleState.ADVISED
+ def mockModuleSyncTasks = Mock(ModuleSyncTasks)
- def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap, mockLcmEventsCmHandleStateHandler)
+ def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks)
- def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles'() {
- given: 'cm handles in an advised state and a data sync state'
- def compositeState1 = new CompositeState(cmHandleState: cmHandleState)
- def compositeState2 = new CompositeState(cmHandleState: cmHandleState)
- def yangModelCmHandle1 = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState1)
- def yangModelCmHandle2 = new YangModelCmHandle(id: 'some-cm-handle-2', compositeState: compositeState2)
- and: 'sync utilities return a cm handle twice'
- mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2]
- when: 'module sync poll is executed'
- objectUnderTest.executeAdvisedCmHandlePoll()
- then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
- 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState1
- and: 'module sync service deletes schema set of cm handle if it exists'
- 1 * mockModuleSyncService.deleteSchemaSetIfExists(yangModelCmHandle1)
- and: 'module sync service syncs the first cm handle and creates a schema set'
- 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle1)
- then: 'the state handler is called for the first cm handle'
- 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle1, CmHandleState.READY)
- and: 'the inventory persistence cm handle returns a composite state for the second cm handle'
- mockInventoryPersistence.getCmHandleState('some-cm-handle-2') >> compositeState2
- and: 'module sync service syncs the second cm handle and creates a schema set'
- 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle2)
- then: 'the state handler is called for the second cm handle'
- 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle2, CmHandleState.READY)
- }
-
- def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handle with failure'() {
- given: 'cm handles in an advised state'
- def compositeState = new CompositeState(cmHandleState: cmHandleState)
- def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState)
- and: 'sync utilities return a cm handle'
- mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle]
- when: 'module sync poll is executed'
- objectUnderTest.executeAdvisedCmHandlePoll()
- then: 'the inventory persistence cm handle returns a composite state for the cm handle'
- 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState
- and: 'module sync service attempts to sync the cm handle and throws an exception'
- 1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') }
- and: 'update lock reason, details and attempts is invoked'
- 1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(compositeState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception')
- and: 'the state handler is called to update the state to LOCKED'
- 1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.LOCKED)
- }
-
- def 'Schedule a Cm-Handle Sync with condition #scenario '() {
- given: 'cm handles in an locked state'
- def compositeState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED)
- .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build()
- def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState)
- and: 'sync utilities return a cm handle twice'
- mockSyncUtils.getModuleSyncFailedCmHandles() >> [yangModelCmHandle, yangModelCmHandle]
- and: 'inventory persistence returns the composite state of the cm handle'
- mockInventoryPersistence.getCmHandleState(yangModelCmHandle.getId()) >> compositeState
- and: 'sync utils retry locked cm handle returns #isReadyForRetry'
- mockSyncUtils.isReadyForRetry(compositeState) >>> isReadyForRetry
- when: 'module sync poll is executed'
- objectUnderTest.executeLockedCmHandlePoll()
- then: 'the first cm handle is updated to state "ADVISED" from "READY"'
- expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.ADVISED)
+ def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() {
+ given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
+ mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+ when: ' module sync is started'
+ objectUnderTest.moduleSyncAdvisedCmHandles()
+ then: 'it performs #expectedNumberOfTaskExecutions tasks'
+ expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_)
where:
- scenario | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState
- 'retry locked cm handle once' | [true, false] || 1
- 'retry locked cm handle twice' | [true, true] || 2
- 'do not retry locked cm handle' | [false, false] || 0
+ scenario | numberOfAdvisedCmHandles || expectedNumberOfTaskExecutions
+ 'less then 1 batch' | 1 || 1
+ 'exactly 1 batch' | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 1
+ '2 batches' | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 2
+ 'queue capacity' | testQueueCapacity || 3
+ 'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+ }
+
+ def 'Reset failed cm handles.'() {
+ given: 'sync utilities returns failed cm handles'
+ def failedCmHandles = [new YangModelCmHandle()]
+ mockSyncUtils.getModuleSyncFailedCmHandles() >> failedCmHandles
+ when: ' reset failed cm handles is started'
+ objectUnderTest.resetPreviouslyFailedCmHandles()
+ then: 'it is delegated to the module sync task (service)'
+ 1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles)
+ }
+
+ def createDataNodes(numberOfDataNodes) {
+ def dataNodes = []
+ (1..numberOfDataNodes).each {dataNodes.add(new DataNode())}
+ return dataNodes
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
index 52fb110..6ccdcf1 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
@@ -47,8 +47,6 @@
class SyncUtilsSpec extends Specification{
- def mockInventoryPersistence = Mock(InventoryPersistence)
-
def mockCmHandleQueries = Mock(CmHandleQueries)
def mockDmiDataOperations = Mock(DmiDataOperations)
@@ -63,28 +61,14 @@
@Shared
def dataNode = new DataNode(leaves: ['id': 'cm-handle-123'])
- @Shared
- def dataNodeAdditionalProperties = new DataNode(leaves: ['name': 'dmiProp1', 'value': 'dmiValue1'])
-
def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
given: 'the inventory persistence service returns a collection of data nodes'
mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
- and: 'we have some additional (dmi, private) properties'
- dataNodeAdditionalProperties.xpath = dataNode.xpath + '/additional-properties[@name="dmiProp1"]'
- dataNode.childDataNodes = [dataNodeAdditionalProperties]
when: 'get advised cm handles are fetched'
def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles()
then: 'the returned data node collection is the correct size'
yangModelCmHandles.size() == expectedDataNodeSize
- and: 'if there is a data node the additional (dmi, private) properties are included'
- if (expectedDataNodeSize > 0) {
- assert yangModelCmHandles[0].dmiProperties[0].name == 'dmiProp1'
- assert yangModelCmHandles[0].dmiProperties[0].value == 'dmiValue1'
- }
- and: 'yang model collection contains the correct data'
- yangModelCmHandles.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) ==
- dataNodeCollection.stream().map(dataNode -> dataNode.leaves.get("id")).collect(Collectors.toSet())
where: 'the following scenarios are used'
scenario | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
'exists' | [dataNode] || 1 | 1
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy
new file mode 100644
index 0000000..20d384f
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.utils
+
+import org.onap.cps.ncmp.api.impl.utils.YangDataConverter
+import org.onap.cps.spi.model.DataNode
+import spock.lang.Specification
+
+class YangDataConverterSpec extends Specification{
+
+ def 'Convert a cm handle data node with private properties.'() {
+ given: 'a datanode with some additional (dmi, private) properties'
+ def dataNodeAdditionalProperties = new DataNode(xpath:'/additional-properties[@name="dmiProp1"]',
+ leaves: ['name': 'dmiProp1', 'value': 'dmiValue1'])
+ def dataNode = new DataNode(childDataNodes:[dataNodeAdditionalProperties])
+ when: 'the dataNode is converted'
+ def yangModelCmHandle = YangDataConverter.convertCmHandleToYangModel(dataNode,'sample-id')
+ then: 'the converted object has the correct id'
+ assert yangModelCmHandle.id == 'sample-id'
+ and: 'the additional (dmi, private) properties are included'
+ assert yangModelCmHandle.dmiProperties[0].name == 'dmiProp1'
+ assert yangModelCmHandle.dmiProperties[0].value == 'dmiValue1'
+ }
+}
diff --git a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java
index d80306b..c77daaf 100644
--- a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java
+++ b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java
@@ -22,6 +22,7 @@
package org.onap.cps.spi.model;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -33,7 +34,9 @@
@Setter(AccessLevel.PROTECTED)
@Getter
@EqualsAndHashCode
-public class DataNode {
+public class DataNode implements Serializable {
+
+ private static final long serialVersionUID = 1482619410918597467L;
DataNode() { }