Distributed datastore solution for Data Sync Watchdog
- update lombok config to handle Qualifier annotation
- update Semaphore config to use ConcurrentMap
- update SyncUtils to return a list of cm handles
- update DataSyncWatchdog and ModuleSyncWatchdog with Qualifier
- update DataSyncWatchdog to handle a list of cm handles
- Use get with xpath to check cm handle state
Issue-ID: CPS-1015
Change-Id: Icb39bd29f89e0020d49a1f8960476ffe81b12362
Signed-off-by: kissand <andras.zoltan.kiss@est.tech>
diff --git a/cps-ncmp-service/lombok.config b/cps-ncmp-service/lombok.config
index 0736fc5..b60a192 100644
--- a/cps-ncmp-service/lombok.config
+++ b/cps-ncmp-service/lombok.config
@@ -1,5 +1,5 @@
# ============LICENSE_START=======================================================
-# Copyright (C) 2021 Nordix Foundation
+# Copyright (C) 2021-2022 Nordix Foundation
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,3 +18,4 @@
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
+lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java
index 6696f8e..f8836e6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceImpl.java
@@ -148,7 +148,7 @@
cpsPathQueryResult = NO_QUERY_TO_EXECUTE;
} else {
try {
- cpsPathQueryResult = cmHandleQueries.getCmHandleDataNodesByCpsPath(
+ cpsPathQueryResult = cmHandleQueries.queryCmHandleDataNodesByCpsPath(
cpsPath.get("cpsPath"), INCLUDE_ALL_DESCENDANTS)
.stream().map(this::createNcmpServiceCmHandle)
.collect(Collectors.toMap(NcmpServiceCmHandle::getCmHandleId,
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
index 1efe176..571558a 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
@@ -1,5 +1,5 @@
/*
- * ============LICENSE_START========================================================
+ * ===========LICENSE_START========================================================
* Copyright (C) 2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,7 +24,6 @@
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.springframework.context.annotation.Bean;
@@ -41,23 +40,23 @@
/**
* Module Sync Distributed Map Instance.
*
- * @return configured map of module sync semaphore
+ * @return configured map of module sync semaphores
*/
@Bean
- public ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap() {
- return createHazelcastInstance("moduleSyncSemaphore", "moduleSyncSemaphoreConfig")
- .getMap("moduleSyncSemaphore");
+ public ConcurrentMap<String, Boolean> moduleSyncSemaphores() {
+ return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig")
+ .getMap("moduleSyncSemaphores");
}
/**
* Data Sync Distributed Map Instance.
*
- * @return configured map of data sync semaphore
+ * @return configured map of data sync semaphores
*/
@Bean
- public Map<String, String> dataSyncSemaphoreMap() {
- return createHazelcastInstance("dataSyncSemaphore", "dataSyncSemaphoreConfig")
- .getMap("dataSyncSemaphore");
+ public ConcurrentMap<String, Boolean> dataSyncSemaphores() {
+ return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig")
+ .getMap("dataSyncSemaphores");
}
private HazelcastInstance createHazelcastInstance(
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java
index 92387ba..2451617 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/CmHandleQueries.java
@@ -22,6 +22,7 @@
import static org.onap.cps.ncmp.api.impl.utils.YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle;
import static org.onap.cps.spi.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS;
+import static org.onap.cps.spi.FetchDescendantsOption.OMIT_DESCENDANTS;
import java.util.Collection;
import java.util.Collections;
@@ -65,7 +66,7 @@
final String cpsPath = "//public-properties[@name=\"" + publicPropertyQueryPair.getKey()
+ "\" and @value=\"" + publicPropertyQueryPair.getValue() + "\"]";
- final Collection<DataNode> dataNodes = getCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS);
+ final Collection<DataNode> dataNodes = queryCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS);
if (cmHandleIdToNcmpServiceCmHandles == null) {
cmHandleIdToNcmpServiceCmHandles = collectDataNodesToNcmpServiceCmHandles(dataNodes);
} else {
@@ -108,8 +109,8 @@
* @param cmHandleState cm handle state
* @return a list of cm handles
*/
- public List<DataNode> getCmHandlesByState(final CmHandleState cmHandleState) {
- return getCmHandleDataNodesByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]",
+ public List<DataNode> queryCmHandlesByState(final CmHandleState cmHandleState) {
+ return queryCmHandleDataNodesByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]",
INCLUDE_ALL_DESCENDANTS);
}
@@ -119,21 +120,23 @@
* @param cpsPath cps path for which the cmHandle is requested
* @return a list of data nodes representing the cm handles.
*/
- public List<DataNode> getCmHandleDataNodesByCpsPath(final String cpsPath,
- final FetchDescendantsOption fetchDescendantsOption) {
+ public List<DataNode> queryCmHandleDataNodesByCpsPath(final String cpsPath,
+ final FetchDescendantsOption fetchDescendantsOption) {
return cpsDataPersistenceService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
cpsPath + ANCESTOR_CM_HANDLES, fetchDescendantsOption);
}
/**
- * Method which returns cm handles by the cm handle id and state.
+ * Method to check the state of a cm handle with given id.
+ *
* @param cmHandleId cm handle id
- * @param cmHandleState cm handle state
- * @return a list of cm handles
+ * @param requiredCmHandleState the required state of the cm handle
+ * @return a boolean, true if the state is equal to the required state
*/
- public List<DataNode> getCmHandlesByIdAndState(final String cmHandleId, final CmHandleState cmHandleState) {
- return getCmHandleDataNodesByCpsPath("//cm-handles[@id='" + cmHandleId + "']/state[@cm-handle-state=\""
- + cmHandleState + "\"]", FetchDescendantsOption.OMIT_DESCENDANTS);
+ public boolean cmHandleHasState(final String cmHandleId, final CmHandleState requiredCmHandleState) {
+ final DataNode stateDataNode = getCmHandleState(cmHandleId);
+ final String cmHandleStateAsString = (String) stateDataNode.getLeaves().get("cm-handle-state");
+ return CmHandleState.valueOf(cmHandleStateAsString).equals(requiredCmHandleState);
}
/**
@@ -141,8 +144,8 @@
* @param dataStoreSyncState sync state
* @return a list of cm handles
*/
- public List<DataNode> getCmHandlesByOperationalSyncState(final DataStoreSyncState dataStoreSyncState) {
- return getCmHandleDataNodesByCpsPath("//state/datastores" + "/operational[@sync-state=\""
+ public List<DataNode> queryCmHandlesByOperationalSyncState(final DataStoreSyncState dataStoreSyncState) {
+ return queryCmHandleDataNodesByCpsPath("//state/datastores" + "/operational[@sync-state=\""
+ dataStoreSyncState + "\"]", FetchDescendantsOption.OMIT_DESCENDANTS);
}
@@ -160,6 +163,12 @@
return convertYangModelCmHandleToNcmpServiceCmHandle(YangDataConverter
.convertCmHandleToYangModel(dataNode, dataNode.getLeaves().get("id").toString()));
}
+
+ private DataNode getCmHandleState(final String cmHandleId) {
+ final String xpath = "/dmi-registry/cm-handles[@id='" + cmHandleId + "']/state";
+ return cpsDataPersistenceService.getDataNode(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
+ xpath, OMIT_DESCENDANTS);
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
index 395fb01..45ba078 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
@@ -1,5 +1,5 @@
/*
- * ============LICENSE_START=======================================================
+ * ============LICENSE_START=======================================================
* Copyright (C) 2022 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,14 +21,15 @@
package org.onap.cps.ncmp.api.inventory.sync;
import java.time.OffsetDateTime;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.CpsDataService;
-import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -37,39 +38,45 @@
@Service
public class DataSyncWatchdog {
+ private static final boolean DATA_SYNC_IN_PROGRESS = false;
+ private static final boolean DATA_SYNC_DONE = true;
+
private final InventoryPersistence inventoryPersistence;
private final CpsDataService cpsDataService;
private final SyncUtils syncUtils;
+ @Qualifier("dataSyncSemaphores")
+ private final ConcurrentMap<String, Boolean> dataSyncSemaphores;
+
/**
* Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
* 'UNSYNCHRONIZED'.
*/
@Scheduled(fixedDelayString = "${timers.cm-handle-data-sync.sleep-time-ms:30000}")
public void executeUnSynchronizedReadyCmHandlePoll() {
- YangModelCmHandle unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle();
- while (unSynchronizedReadyCmHandle != null) {
+ syncUtils.getUnsynchronizedReadyCmHandles().forEach(unSynchronizedReadyCmHandle -> {
final String cmHandleId = unSynchronizedReadyCmHandle.getId();
- log.debug("Cm-Handles found in READY and UNSYNCHRONIZED state: {}", cmHandleId);
- final CompositeState compositeState = inventoryPersistence
- .getCmHandleState(cmHandleId);
- final String resourceData = syncUtils.getResourceData(cmHandleId);
- if (resourceData == null) {
- log.debug("Error accessing the node for Cm-Handle: {}", cmHandleId);
- } else if (unSynchronizedReadyCmHandle.getCompositeState().getDataSyncEnabled().equals(false)) {
- log.debug("Error: data sync enabled for {} must be true."
- + "Data sync enabled is currently set to false", cmHandleId);
+ if (hasPushedIntoSemaphoreMap(cmHandleId)) {
+ log.debug("Executing data sync on {}", cmHandleId);
+ final CompositeState compositeState = inventoryPersistence
+ .getCmHandleState(cmHandleId);
+ final String resourceData = syncUtils.getResourceData(cmHandleId);
+ if (resourceData == null) {
+ log.debug("Error retrieving resource data for Cm-Handle: {}", cmHandleId);
+ } else {
+ cpsDataService.saveData("NFP-Operational", cmHandleId,
+ resourceData, OffsetDateTime.now());
+ setSyncStateToSynchronized().accept(compositeState);
+ inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+ updateDataSyncSemaphoreMap(cmHandleId);
+ }
} else {
- cpsDataService.saveData("NFP-Operational", cmHandleId,
- resourceData, OffsetDateTime.now());
- setSyncStateToSynchronized().accept(compositeState);
- inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
+ log.debug("{} already processed by another instance", cmHandleId);
}
- unSynchronizedReadyCmHandle = syncUtils.getAnUnSynchronizedReadyCmHandle();
- }
- log.debug("No Cm-Handles currently found in an READY State and Operational Sync State is UNSYNCHRONIZED");
+ });
+ log.debug("No Cm-Handles currently found in READY State and Operational Sync State is UNSYNCHRONIZED");
}
private Consumer<CompositeState> setSyncStateToSynchronized() {
@@ -81,4 +88,12 @@
.lastSyncTime(CompositeState.nowInSyncTimeFormat()).build());
};
}
+
+ private void updateDataSyncSemaphoreMap(final String cmHandleId) {
+ dataSyncSemaphores.replace(cmHandleId, DATA_SYNC_DONE);
+ }
+
+ private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
+ return dataSyncSemaphores.putIfAbsent(cmHandleId, DATA_SYNC_IN_PROGRESS) == null;
+ }
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index 7c2a4fc..be811a1 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -31,6 +31,7 @@
import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@@ -39,13 +40,17 @@
@Component
public class ModuleSyncWatchdog {
+ private static final boolean MODEL_SYNC_IN_PROGRESS = false;
+ private static final boolean MODEL_SYNC_DONE = true;
+
private final InventoryPersistence inventoryPersistence;
private final SyncUtils syncUtils;
private final ModuleSyncService moduleSyncService;
- private final ConcurrentMap<String, Boolean> moduleSyncSemaphoreMap;
+ @Qualifier("moduleSyncSemaphores")
+ private final ConcurrentMap<String, Boolean> moduleSyncSemaphores;
private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
@@ -100,10 +105,10 @@
}
private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
- moduleSyncSemaphoreMap.replace(cmHandleId, true);
+ moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE);
}
private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
- return moduleSyncSemaphoreMap.putIfAbsent(cmHandleId, false) == null;
+ return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null;
}
}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
index 2b7d3c9..64ce218 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
@@ -44,7 +44,6 @@
import org.onap.cps.ncmp.api.inventory.CmHandleState;
import org.onap.cps.ncmp.api.inventory.CompositeState;
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
-import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
import org.onap.cps.spi.FetchDescendantsOption;
import org.onap.cps.spi.model.DataNode;
@@ -56,8 +55,6 @@
@Service
@RequiredArgsConstructor
public class SyncUtils {
- private final InventoryPersistence inventoryPersistence;
-
private final CmHandleQueries cmHandleQueries;
private final DmiDataOperations dmiDataOperations;
@@ -73,7 +70,7 @@
*/
public List<YangModelCmHandle> getAdvisedCmHandles() {
final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>(
- cmHandleQueries.getCmHandlesByState(CmHandleState.ADVISED));
+ cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED));
log.info("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size());
if (advisedCmHandlesAsDataNodeList.isEmpty()) {
return Collections.emptyList();
@@ -86,25 +83,26 @@
* First query data nodes for cm handles with CM Handle Operational Sync State in "UNSYNCHRONIZED" and
* randomly select a CM Handle and query the data nodes for CM Handle State in "READY".
*
- * @return a random yang model cm handle with State in READY and Operation Sync State in "UNSYNCHRONIZED",
- * return null if not found
+ * @return a randomized yang model cm handle list with State in READY and Operation Sync State in "UNSYNCHRONIZED",
+ * return empty list if not found
*/
- public YangModelCmHandle getAnUnSynchronizedReadyCmHandle() {
- final List<DataNode> unSynchronizedCmHandles = cmHandleQueries
- .getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED);
- if (unSynchronizedCmHandles.isEmpty()) {
- return null;
- }
- Collections.shuffle(unSynchronizedCmHandles);
- for (final DataNode cmHandle : unSynchronizedCmHandles) {
- final String cmHandleId = cmHandle.getLeaves().get("id").toString();
- final List<DataNode> readyCmHandles = cmHandleQueries
- .getCmHandlesByIdAndState(cmHandleId, CmHandleState.READY);
- if (!readyCmHandles.isEmpty()) {
- return inventoryPersistence.getYangModelCmHandle(cmHandleId);
+ public List<YangModelCmHandle> getUnsynchronizedReadyCmHandles() {
+ final List<DataNode> unsynchronizedCmHandles = cmHandleQueries
+ .queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED);
+
+ final List<YangModelCmHandle> yangModelCmHandles = new ArrayList<>();
+ for (final DataNode unsynchronizedCmHandle : unsynchronizedCmHandles) {
+ final String cmHandleId = unsynchronizedCmHandle.getLeaves().get("id").toString();
+ if (cmHandleQueries.cmHandleHasState(cmHandleId, CmHandleState.READY)) {
+ yangModelCmHandles.addAll(
+ convertCmHandlesDataNodesToYangModelCmHandles(
+ Collections.singletonList(unsynchronizedCmHandle)));
}
}
- return null;
+
+ Collections.shuffle(yangModelCmHandles);
+
+ return yangModelCmHandles;
}
/**
@@ -113,9 +111,9 @@
* @return a random LOCKED yang model cm handle, return null if not found
*/
public List<YangModelCmHandle> getModuleSyncFailedCmHandles() {
- final List<DataNode> lockedCmHandlesAsDataNodeList = cmHandleQueries.getCmHandleDataNodesByCpsPath(
- "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]",
- FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
+ final List<DataNode> lockedCmHandlesAsDataNodeList = cmHandleQueries.queryCmHandleDataNodesByCpsPath(
+ "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]",
+ FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
return convertCmHandlesDataNodesToYangModelCmHandles(lockedCmHandlesAsDataNodeList);
}
@@ -136,8 +134,8 @@
}
}
compositeState.setLockReason(CompositeState.LockReason.builder()
- .details(String.format("Attempt #%d failed: %s", attempt, errorMessage))
- .lockReasonCategory(lockReasonCategory).build());
+ .details(String.format("Attempt #%d failed: %s", attempt, errorMessage))
+ .lockReasonCategory(lockReasonCategory).build());
}
@@ -150,8 +148,8 @@
public boolean isReadyForRetry(final CompositeState compositeState) {
int timeInMinutesUntilNextAttempt = 1;
final OffsetDateTime time =
- OffsetDateTime.parse(compositeState.getLastUpdateTime(),
- DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
+ OffsetDateTime.parse(compositeState.getLastUpdateTime(),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
final Matcher matcher = retryAttemptPattern.matcher(compositeState.getLockReason().getDetails());
if (matcher.find()) {
timeInMinutesUntilNextAttempt = (int) Math.pow(2, Integer.parseInt(matcher.group(1)));
@@ -161,7 +159,7 @@
final int timeSinceLastAttempt = (int) Duration.between(time, OffsetDateTime.now()).toMinutes();
if (timeInMinutesUntilNextAttempt >= timeSinceLastAttempt) {
log.info("Time until next attempt is {} minutes: ",
- timeInMinutesUntilNextAttempt - timeSinceLastAttempt);
+ timeInMinutesUntilNextAttempt - timeSinceLastAttempt);
}
return timeSinceLastAttempt > timeInMinutesUntilNextAttempt;
}
@@ -174,8 +172,8 @@
*/
public String getResourceData(final String cmHandleId) {
final ResponseEntity<Object> resourceDataResponseEntity = dmiDataOperations.getResourceDataFromDmi(
- cmHandleId, DmiOperations.DataStoreEnum.PASSTHROUGH_OPERATIONAL,
- UUID.randomUUID().toString());
+ cmHandleId, DmiOperations.DataStoreEnum.PASSTHROUGH_OPERATIONAL,
+ UUID.randomUUID().toString());
if (resourceDataResponseEntity.getStatusCode().is2xxSuccessful()) {
return getFirstResource(resourceDataResponseEntity.getBody());
}
@@ -190,9 +188,10 @@
return jsonObjectMapper.asJsonString(Map.of(firstElement.getKey(), firstElement.getValue()));
}
- private List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles(
- final List<DataNode> cmHandlesAsDataNodeList) {
- return cmHandlesAsDataNodeList.stream().map(dataNode -> YangDataConverter.convertCmHandleToYangModel(dataNode,
- dataNode.getLeaves().get("id").toString())).collect(Collectors.toList());
+ private static List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles(
+ final List<DataNode> cmHandlesAsDataNodeList) {
+ return cmHandlesAsDataNodeList.stream()
+ .map(cmHandle -> YangDataConverter.convertCmHandleToYangModel(cmHandle,
+ cmHandle.getLeaves().get("id").toString())).collect(Collectors.toList());
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy
index 19c5049..f1294ce 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/NetworkCmProxyCmHandlerQueryServiceSpec.groovy
@@ -21,7 +21,6 @@
package org.onap.cps.ncmp.api.impl
import org.onap.cps.cpspath.parser.PathParsingException
-import org.onap.cps.ncmp.api.NetworkCmProxyCmHandlerQueryService
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.inventory.CmHandleQueries
import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
@@ -52,7 +51,7 @@
def conditionProperties = createConditionProperties('cmHandleWithCpsPath', [['cpsPath' : '/some/cps/path']])
cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties])
and: 'cmHandleQueries returns a non null query result'
- cmHandleQueries.getCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [new DataNode(leaves: ['id':'some-cmhandle-id'])]
+ cmHandleQueries.queryCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [new DataNode(leaves: ['id':'some-cmhandle-id'])]
and: 'CmHandleQueries returns cmHandles with the relevant query result'
cmHandleQueries.combineCmHandleQueries(*_) >> ['PNFDemo1': new NcmpServiceCmHandle(cmHandleId: 'PNFDemo1'), 'PNFDemo3': new NcmpServiceCmHandle(cmHandleId: 'PNFDemo3')]
when: 'the query is executed for both cm handle ids and details'
@@ -70,7 +69,7 @@
def conditionProperties = createConditionProperties('cmHandleWithCpsPath', [['cpsPath' : '/some/cps/path']])
cmHandleQueryParameters.setCmHandleQueryParameters([conditionProperties])
and: 'cmHandleQueries throws a path parsing exception'
- cmHandleQueries.getCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> { throw thrownException }
+ cmHandleQueries.queryCmHandleDataNodesByCpsPath('/some/cps/path', FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> { throw thrownException }
when: 'the query is executed for both cm handle ids and details'
objectUnderTest.queryCmHandleIds(cmHandleQueryParameters)
objectUnderTest.queryCmHandles(cmHandleQueryParameters)
@@ -134,7 +133,7 @@
and: 'cmHandles are returned from the module names query'
inventoryPersistence.queryAnchors(['some-module-name']) >> anchorsForModuleQuery
and: 'cmHandleQueries returns a datanode result'
- 2 * cmHandleQueries.getCmHandleDataNodesByCpsPath(*_) >> [someCmHandleDataNode]
+ 2 * cmHandleQueries.queryCmHandleDataNodesByCpsPath(*_) >> [someCmHandleDataNode]
when: 'the query is executed for both cm handle ids and details'
def returnedCmHandlesJustIds = objectUnderTest.queryCmHandleIds(cmHandleQueryParameters)
def returnedCmHandlesWithData = objectUnderTest.queryCmHandles(cmHandleQueryParameters)
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
index fe7ed9e..ea84b44 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
@@ -31,19 +31,19 @@
class SynchronizationSemaphoresConfigSpec extends Specification {
@Autowired
- private Map<String, String> moduleSyncSemaphore;
+ private Map<String, Boolean> moduleSyncSemaphores;
@Autowired
- private Map<String, String> dataSyncSemaphore;
+ private Map<String, Boolean> dataSyncSemaphores;
def 'Embedded Sync Semaphores'() {
- expect: 'system is able to create an instance of ModuleSyncSemaphore'
- assert null != moduleSyncSemaphore
- and: 'system is able to create an instance of DataSyncSemaphore'
- assert null != dataSyncSemaphore
+ expect: 'system is able to create an instance of ModuleSyncSemaphores'
+ assert null != moduleSyncSemaphores
+ and: 'system is able to create an instance of DataSyncSemaphores'
+ assert null != dataSyncSemaphores
and: 'we have 2 instances'
assert Hazelcast.allHazelcastInstances.size() == 2
and: 'the names match'
- assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphore', 'dataSyncSemaphore']
+ assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphores', 'dataSyncSemaphores']
}
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy
index 10a5d62..ff17330 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/CmHandleQueriesSpec.groovy
@@ -92,31 +92,31 @@
cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry',
'//state[@cm-handle-state="ADVISED"]/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS) >> sampleDataNodes
when: 'cm handles are fetched by state'
- def result = objectUnderTest.getCmHandlesByState(cmHandleState)
+ def result = objectUnderTest.queryCmHandlesByState(cmHandleState)
then: 'the returned result matches the result from the persistence service'
assert result == sampleDataNodes
}
- def 'Get Cm Handles By State and Cm-Handle Id'() {
+ def 'Get Cm Handles state by Cm-Handle Id'() {
given: 'a cm handle state to query'
def cmHandleState = CmHandleState.READY
and: 'cps data service returns a list of data nodes'
- cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry',
- '//cm-handles[@id=\'some-cm-handle\']/state[@cm-handle-state="'+ 'READY'+'"]/ancestor::cm-handles', OMIT_DESCENDANTS) >> sampleDataNodes
+ cpsDataPersistenceService.getDataNode('NCMP-Admin', 'ncmp-dmi-registry',
+ '/dmi-registry/cm-handles[@id=\'some-cm-handle\']/state', OMIT_DESCENDANTS) >> new DataNode(leaves: ['cm-handle-state': 'READY'])
when: 'cm handles are fetched by state and id'
- def result = objectUnderTest.getCmHandlesByIdAndState('some-cm-handle', cmHandleState)
+ def result = objectUnderTest.getCmHandleState('some-cm-handle')
then: 'the returned result is a list of data nodes returned by cps data service'
- assert result == sampleDataNodes
+ assert result == new DataNode(leaves: ['cm-handle-state': 'READY'])
}
- def 'Get Cm Handles By Operational Sync State : UNSYNCHRONIZED'() {
+ def 'Retrieve Cm Handles By Operational Sync State : UNSYNCHRONIZED'() {
given: 'a cm handle state to query'
def cmHandleState = CmHandleState.READY
and: 'cps data service returns a list of data nodes'
cpsDataPersistenceService.queryDataNodes('NCMP-Admin', 'ncmp-dmi-registry',
'//state/datastores/operational[@sync-state="'+'UNSYNCHRONIZED'+'"]/ancestor::cm-handles', OMIT_DESCENDANTS) >> sampleDataNodes
when: 'cm handles are fetched by the UNSYNCHRONIZED operational sync state'
- def result = objectUnderTest.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED)
+ def result = objectUnderTest.queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED)
then: 'the returned result is a list of data nodes returned by cps data service'
assert result == sampleDataNodes
}
@@ -130,7 +130,7 @@
cpsPath + '/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS)
>> Arrays.asList(cmHandleDataNode)
when: 'get cm handles by cps path is invoked'
- def result = objectUnderTest.getCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS)
+ def result = objectUnderTest.queryCmHandleDataNodesByCpsPath(cpsPath, INCLUDE_ALL_DESCENDANTS)
then: 'the returned result is a list of data nodes returned by cps data service'
assert result.contains(cmHandleDataNode)
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdogSpec.groovy
similarity index 80%
rename from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncSpec.groovy
rename to cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdogSpec.groovy
index 650a779..6053819 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdogSpec.groovy
@@ -26,10 +26,11 @@
import org.onap.cps.ncmp.api.inventory.CompositeState
import org.onap.cps.ncmp.api.inventory.InventoryPersistence
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState
-import spock.lang.Shared
import spock.lang.Specification
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.ConcurrentMap
-class DataSyncSpec extends Specification {
+class DataSyncWatchdogSpec extends Specification {
def mockInventoryPersistence = Mock(InventoryPersistence)
@@ -37,10 +38,11 @@
def mockSyncUtils = Mock(SyncUtils)
- @Shared
+ def stubbedMap = Stub(ConcurrentMap)
+
def jsonString = '{"stores:bookstore":{"categories":[{"code":"01"}]}}'
- def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils)
+ def objectUnderTest = new DataSyncWatchdog(mockInventoryPersistence, mockCpsDataService, mockSyncUtils, stubbedMap as ConcurrentHashMap)
def compositeState = getCompositeState()
@@ -52,7 +54,7 @@
given: 'sample resource data'
def resourceData = jsonString
and: 'sync utilities return a cm handle twice'
- mockSyncUtils.getAnUnSynchronizedReadyCmHandle() >>> [yangModelCmHandle1, yangModelCmHandle2, null]
+ mockSyncUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2]
when: 'data sync poll is executed'
objectUnderTest.executeUnSynchronizedReadyCmHandlePoll()
then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
@@ -73,24 +75,18 @@
1 * mockInventoryPersistence.saveCmHandleState('some-cm-handle-2', compositeState)
}
- def 'Schedule Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED which return empty data from Node because #scenario'() {
- given: 'a yang model cm handle'
- def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: new CompositeState(dataSyncEnabled: dataSyncEnabled))
- and: 'sync utilities returns a single cm handle'
- mockSyncUtils.getAnUnSynchronizedReadyCmHandle() >>> [yangModelCmHandle, null]
+ def 'Schedule Data Sync for Cm Handle State in READY and Operational Sync State in UNSYNCHRONIZED which return empty data from Node'() {
+ given: 'cm handles in an ready state and operational sync state in unsynchronized'
+ and: 'sync utilities return a cm handle twice'
+ mockSyncUtils.getUnsynchronizedReadyCmHandles() >> [yangModelCmHandle1]
when: 'data sync poll is executed'
objectUnderTest.executeUnSynchronizedReadyCmHandlePoll()
then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
- 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState
+ 1 * mockInventoryPersistence.getCmHandleState('some-cm-handle-1') >> compositeState
and: 'the sync util returns first resource data'
- 1 * mockSyncUtils.getResourceData('some-cm-handle') >> resourceData
+ 1 * mockSyncUtils.getResourceData('some-cm-handle-1') >> null
and: 'the cm-handle data is not saved'
0 * mockCpsDataService.saveData('NFP-Operational', 'some-cm-handle-1', jsonString, _)
- where:
- scenario | dataSyncEnabled | resourceData
- 'data sync is not enabled' | false | jsonString
- 'resource data is null' | true | null
- 'data sync is not enabled and resource data is null' | false | null
}
def createSampleYangModelCmHandle(cmHandleId) {
@@ -100,7 +96,7 @@
def getCompositeState() {
def cmHandleState = CmHandleState.READY
- def compositeState = new CompositeState(cmHandleState: cmHandleState, dataSyncEnabled: true)
+ def compositeState = new CompositeState(cmHandleState: cmHandleState)
compositeState.setDataStores(CompositeState.DataStores.builder()
.operationalDataStore(CompositeState.Operational.builder().dataStoreSyncState(DataStoreSyncState.SYNCHRONIZED)
.build()).build())
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
index fb4ca39..52fb110 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
@@ -26,6 +26,7 @@
import org.onap.cps.ncmp.api.impl.operations.DmiDataOperations
import org.onap.cps.ncmp.api.impl.operations.DmiOperations
import org.onap.cps.ncmp.api.inventory.CmHandleQueries
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
import org.onap.cps.ncmp.api.inventory.CmHandleState
import org.onap.cps.ncmp.api.inventory.CompositeState
import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
@@ -54,7 +55,7 @@
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
- def objectUnderTest = new SyncUtils(mockInventoryPersistence, mockCmHandleQueries, mockDmiDataOperations, jsonObjectMapper)
+ def objectUnderTest = new SyncUtils(mockCmHandleQueries, mockDmiDataOperations, jsonObjectMapper)
@Shared
def formattedDateAndTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(OffsetDateTime.now())
@@ -68,7 +69,7 @@
def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
given: 'the inventory persistence service returns a collection of data nodes'
- mockCmHandleQueries.getCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
+ mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
and: 'we have some additional (dmi, private) properties'
dataNodeAdditionalProperties.xpath = dataNode.xpath + '/additional-properties[@name="dmiProp1"]'
dataNode.childDataNodes = [dataNodeAdditionalProperties]
@@ -106,7 +107,7 @@
def 'Get all locked Cm-Handle where Lock Reason is LOCKED_MODULE_SYNC_FAILED cm handle #scenario'() {
given: 'the cps (persistence service) returns a collection of data nodes'
- mockCmHandleQueries.getCmHandleDataNodesByCpsPath(
+ mockCmHandleQueries.queryCmHandleDataNodesByCpsPath(
'//lock-reason[@reason="LOCKED_MODULE_SYNC_FAILED"]',
FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS) >> [dataNode]
when: 'get locked Misbehaving cm handle is called'
@@ -132,21 +133,21 @@
}
- def 'Get a Cm-Handle where Operational Sync state is UnSynchronized and Cm-handle state is READY and #scenario'() {
+ def 'Get a Cm-Handle where #scenario'() {
given: 'the inventory persistence service returns a collection of data nodes'
- mockCmHandleQueries.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes
- mockCmHandleQueries.getCmHandlesByIdAndState("cm-handle-123", CmHandleState.READY) >> readyDataNodes
+ mockCmHandleQueries.queryCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes
+ mockCmHandleQueries.cmHandleHasState('cm-handle-123', CmHandleState.READY) >> cmHandleHasState
when: 'get advised cm handles are fetched'
- objectUnderTest.getAnUnSynchronizedReadyCmHandle()
+ def yangModelCollection = objectUnderTest.getUnsynchronizedReadyCmHandles()
then: 'the returned data node collection is the correct size'
- readyDataNodes.size() == expectedDataNodeSize
- and: 'get yang model cm handles is invoked the correct number of times'
- expectedCallsToGetYangModelCmHandle * mockInventoryPersistence.getYangModelCmHandle('cm-handle-123')
+ yangModelCollection.size() == expectedDataNodeSize
+ and: 'the result contains the correct data'
+ yangModelCollection.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) == expectedYangModelCollectionIds
where: 'the following scenarios are used'
- scenario | unSynchronizedDataNodes | readyDataNodes || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
- 'exists' | [dataNode] | [dataNode] || 1 | 1
- 'unsynchronized exist but not ready' | [dataNode] | [] || 0 | 0
- 'does not exist' | [] | [] || 0 | 0
+ scenario | unSynchronizedDataNodes | cmHandleHasState || expectedDataNodeSize | expectedYangModelCollectionIds
+ 'a Cm-Handle unsynchronized and ready' | [dataNode] | true || 1 | ['cm-handle-123'] as Set
+ 'a Cm-Handle unsynchronized but not ready' | [dataNode] | false || 0 | [] as Set
+ 'all Cm-Handle synchronized' | [] | false || 0 | [] as Set
}
def 'Get resource data through DMI Operations #scenario'() {