Performance Improvement: Use hazelcast blocking queue

- Introducing hazelcast for queue and progress map
- process batch of 100 at the time
- decreased module sync watchdog sleeptime to 5 seconds
- separate module sync tasks in new class and some other async preparations and easier testing
- tests for batching in module sync watchdog
- remove qualifiers annotation (support) where no longer needed

Issue-ID: CPS-1210
Issue-ID: CPS-1126
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Change-Id: I0a7d3755bf774e27c5688741bddb01f427d4a8a7
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 1755134..f7fa735 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -167,7 +167,7 @@
 

 timers:

     advised-modules-sync:

-        sleep-time-ms: 30000

+        sleep-time-ms: 5000

     locked-modules-sync:

         sleep-time-ms: 300000

     cm-handle-data-sync:

diff --git a/cps-ncmp-service/lombok.config b/cps-ncmp-service/lombok.config
index b60a192..1fba85b 100644
--- a/cps-ncmp-service/lombok.config
+++ b/cps-ncmp-service/lombok.config
@@ -18,4 +18,3 @@
 
 config.stopBubbling = true
 lombok.addLombokGeneratedAnnotation = true
-lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier
\ No newline at end of file
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
new file mode 100644
index 0000000..abde4c2
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.config.embeddedcache;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.NamedConfig;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.onap.cps.spi.model.DataNode;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Core infrastructure of the hazelcast distributed caches for Module Sync and Data Sync use cases.
+ */
+@Configuration
+public class SynchronizationCacheConfig {
+
+    private static final QueueConfig commonQueueConfig = createQueueConfig();
+    private static final MapConfig moduleSyncStartedConfig =
+        createMapConfig("moduleSyncStartedConfig", TimeUnit.MINUTES.toSeconds(1));
+    private static final MapConfig dataSyncSemaphoresConfig =
+        createMapConfig("dataSyncSemaphoresConfig", TimeUnit.MINUTES.toSeconds(30));
+
+    /**
+     * Module Sync Distributed Queue Instance.
+     *
+     * @return queue of cm handles (data nodes) that need module sync
+     */
+    @Bean
+    public BlockingQueue<DataNode> moduleSyncWorkQueue() {
+        return createHazelcastInstance("moduleSyncWorkQueue", commonQueueConfig)
+            .getQueue("moduleSyncWorkQueue");
+    }
+
+    /**
+     * Module Sync started (and maybe finished) on cm handles (ids).
+     *
+     * @return Map of cm handles (ids) and objects (not used really) for which module sync has started or been completed
+     */
+    @Bean
+    public Map<String, Object> moduleSyncStartedOnCmHandles() {
+        return createHazelcastInstance("moduleSyncStartedOnCmHandles", moduleSyncStartedConfig)
+            .getMap("moduleSyncStartedOnCmHandles");
+    }
+
+    /**
+     * Data Sync Distributed Map Instance.
+     *
+     * @return configured map of data sync semaphores
+     */
+    @Bean
+    public Map<String, Boolean> dataSyncSemaphores() {
+        return createHazelcastInstance("dataSyncSemaphores", dataSyncSemaphoresConfig)
+            .getMap("dataSyncSemaphores");
+    }
+
+    private HazelcastInstance createHazelcastInstance(
+        final String hazelcastInstanceName, final NamedConfig namedConfig) {
+        return Hazelcast.newHazelcastInstance(initializeConfig(hazelcastInstanceName, namedConfig));
+    }
+
+    private Config initializeConfig(final String instanceName, final NamedConfig namedConfig) {
+        final Config config = new Config(instanceName);
+        if (namedConfig instanceof MapConfig) {
+            config.addMapConfig((MapConfig) namedConfig);
+        }
+        if (namedConfig instanceof QueueConfig) {
+            config.addQueueConfig((QueueConfig) namedConfig);
+        }
+        config.setClusterName("synchronization-caches");
+        return config;
+    }
+
+    private static QueueConfig createQueueConfig() {
+        final QueueConfig commonQueueConfig = new QueueConfig("defaultQueueConfig");
+        commonQueueConfig.setBackupCount(3);
+        commonQueueConfig.setAsyncBackupCount(3);
+        return commonQueueConfig;
+    }
+
+    private static MapConfig createMapConfig(final String configName, final long timeToLiveSeconds) {
+        final MapConfig mapConfig = new MapConfig(configName);
+        mapConfig.setBackupCount(3);
+        mapConfig.setAsyncBackupCount(3);
+        mapConfig.setTimeToLiveSeconds((int) timeToLiveSeconds);
+        return mapConfig;
+    }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
deleted file mode 100644
index 571558a..0000000
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfig.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *  ===========LICENSE_START========================================================
- *  Copyright (C) 2022 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.config.embeddedcache;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * Core infrastructure of the hazelcast distributed map for Module Sync and Data Sync use cases.
- */
-@Configuration
-public class SynchronizationSemaphoresConfig {
-
-    private static final int TIME_TO_LIVE_IN_SECONDS = (int) TimeUnit.MINUTES.toSeconds(30);
-
-    /**
-     * Module Sync Distributed Map Instance.
-     *
-     * @return configured map of module sync semaphores
-     */
-    @Bean
-    public ConcurrentMap<String, Boolean> moduleSyncSemaphores() {
-        return createHazelcastInstance("moduleSyncSemaphores", "moduleSyncSemaphoresConfig")
-                .getMap("moduleSyncSemaphores");
-    }
-
-    /**
-     * Data Sync Distributed Map Instance.
-     *
-     * @return configured map of data sync semaphores
-     */
-    @Bean
-    public ConcurrentMap<String, Boolean> dataSyncSemaphores() {
-        return createHazelcastInstance("dataSyncSemaphores", "dataSyncSemaphoresConfig")
-                .getMap("dataSyncSemaphores");
-    }
-
-    private HazelcastInstance createHazelcastInstance(
-            final String hazelcastInstanceName, final String configMapName) {
-        return Hazelcast.newHazelcastInstance(
-                initializeDefaultMapConfig(hazelcastInstanceName, configMapName));
-    }
-
-    private Config initializeDefaultMapConfig(final String instanceName, final String configName) {
-        final Config config = new Config(instanceName);
-        final MapConfig mapConfig = new MapConfig(configName);
-        mapConfig.setTimeToLiveSeconds(TIME_TO_LIVE_IN_SECONDS);
-        mapConfig.setBackupCount(3);
-        mapConfig.setAsyncBackupCount(3);
-        config.addMapConfig(mapConfig);
-        config.setClusterName("synchronization-semaphores");
-        return config;
-    }
-}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
index 45ba078..107f8a0 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/DataSyncWatchdog.java
@@ -21,7 +21,7 @@
 package org.onap.cps.ncmp.api.inventory.sync;
 
 import java.time.OffsetDateTime;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
 import java.util.function.Consumer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -29,7 +29,6 @@
 import org.onap.cps.ncmp.api.inventory.CompositeState;
 import org.onap.cps.ncmp.api.inventory.DataStoreSyncState;
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
@@ -47,8 +46,7 @@
 
     private final SyncUtils syncUtils;
 
-    @Qualifier("dataSyncSemaphores")
-    private final ConcurrentMap<String, Boolean> dataSyncSemaphores;
+    private final Map<String, Boolean> dataSyncSemaphores;
 
     /**
      * Execute Cm Handle poll which queries the cm handle state in 'READY' and Operational Datastore Sync State in
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
index c574aa6..7f61c47 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncService.java
@@ -86,19 +86,17 @@
     }
 
     /**
-     * Deletes the SchemaSet for provided cmHandle if the SchemaSet Exists.
+     * Deletes the SchemaSet for schema set id if the SchemaSet Exists.
      *
-     * @param yangModelCmHandle the yang model of cm handle.
+     * @param schemaSetId the schema set id to be deleted
      */
-    public void deleteSchemaSetIfExists(final YangModelCmHandle yangModelCmHandle) {
-        final String schemaSetAndAnchorName = yangModelCmHandle.getId();
+    public void deleteSchemaSetIfExists(final String schemaSetId) {
         try {
-            cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetAndAnchorName,
+            cpsModuleService.deleteSchemaSet(NFP_OPERATIONAL_DATASTORE_DATASPACE_NAME, schemaSetId,
                 CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED);
-            log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetAndAnchorName);
+            log.debug("SchemaSet for {} has been deleted. Ready to be recreated.", schemaSetId);
         } catch (final SchemaSetNotFoundException e) {
-            log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.",
-                schemaSetAndAnchorName);
+            log.debug("No SchemaSet for {}. Assuming CmHandle has not been previously Module Synced.", schemaSetId);
         }
     }
 
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
new file mode 100644
index 0000000..5e26650
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java
@@ -0,0 +1,113 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
+import org.onap.cps.ncmp.api.impl.utils.YangDataConverter;
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
+import org.onap.cps.ncmp.api.inventory.CmHandleState;
+import org.onap.cps.ncmp.api.inventory.CompositeState;
+import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
+import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
+import org.onap.cps.spi.model.DataNode;
+import org.springframework.stereotype.Component;
+
+@RequiredArgsConstructor
+@Component
+@Slf4j
+public class ModuleSyncTasks {
+    private final InventoryPersistence inventoryPersistence;
+    private final SyncUtils syncUtils;
+    private final ModuleSyncService moduleSyncService;
+    private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
+
+    private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);
+
+    /**
+     * Perform module sync on a batch of cm handles.
+     *
+     * @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
+     * @return completed future to handle post-processing
+     */
+    public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes) {
+        final Map<YangModelCmHandle, CmHandleState> cmHandelStatePerCmHandle = new HashMap<>();
+        for (final DataNode cmHandleAsDataNode : cmHandlesAsDataNodes) {
+            final String cmHandleId = String.valueOf(cmHandleAsDataNode.getLeaves().get("id"));
+            final YangModelCmHandle yangModelCmHandle =
+                YangDataConverter.convertCmHandleToYangModel(cmHandleAsDataNode, cmHandleId);
+            final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
+            try {
+                moduleSyncService.deleteSchemaSetIfExists(cmHandleId);
+                moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
+                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.READY);
+            } catch (final Exception e) {
+                syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
+                    LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
+                setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
+                cmHandelStatePerCmHandle.put(yangModelCmHandle, CmHandleState.LOCKED);
+            }
+            log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
+        }
+        updateCmHandlesStateBatch(cmHandelStatePerCmHandle);
+        return COMPLETED_FUTURE;
+    }
+
+    /**
+     * Reset state to "ADVISED" for any previously failed cm handles.
+     *
+     * @param failedCmHandles previously failed (locked) cm handles
+     * @return completed future to handle post-processing
+     */
+    public CompletableFuture<Void> resetFailedCmHandles(final List<YangModelCmHandle> failedCmHandles) {
+        final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(failedCmHandles.size());
+        for (final YangModelCmHandle failedCmHandle : failedCmHandles) {
+            final CompositeState compositeState = failedCmHandle.getCompositeState();
+            final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
+            if (isReadyForRetry) {
+                log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog",
+                    failedCmHandle.getId());
+                cmHandleStatePerCmHandle.put(failedCmHandle, CmHandleState.ADVISED);
+            }
+        }
+        updateCmHandlesStateBatch(cmHandleStatePerCmHandle);
+        return COMPLETED_FUTURE;
+    }
+
+    private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
+                                        final CompositeState.LockReason lockReason) {
+        advisedCmHandle.getCompositeState().setLockReason(lockReason);
+    }
+
+    private void updateCmHandlesStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) {
+        // To be refactored as part of CPS-1231; Use state-save-batch capability (depends sub-task12, 13)
+        for (final Map.Entry<YangModelCmHandle, CmHandleState> entry : cmHandleStatePerCmHandle.entrySet()) {
+            lcmEventsCmHandleStateHandler.updateCmHandleState(entry.getKey(), entry.getValue());
+        }
+    }
+
+}
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index be811a1..8074fe6 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -21,17 +21,16 @@
 
 package org.onap.cps.ncmp.api.inventory.sync;
 
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler;
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle;
-import org.onap.cps.ncmp.api.inventory.CmHandleState;
-import org.onap.cps.ncmp.api.inventory.CompositeState;
-import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
-import org.onap.cps.ncmp.api.inventory.LockReasonCategory;
-import org.springframework.beans.factory.annotation.Qualifier;
+import org.onap.cps.spi.model.DataNode;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -40,75 +39,75 @@
 @Component
 public class ModuleSyncWatchdog {
 
-    private static final boolean MODEL_SYNC_IN_PROGRESS = false;
-    private static final boolean MODEL_SYNC_DONE = true;
-
-    private final InventoryPersistence inventoryPersistence;
-
     private final SyncUtils syncUtils;
+    private final BlockingQueue<DataNode> moduleSyncWorkQueue;
+    private final Map<String, Object> moduleSyncStartedOnCmHandles;
+    private final ModuleSyncTasks moduleSyncTasks;
 
-    private final ModuleSyncService moduleSyncService;
-
-    @Qualifier("moduleSyncSemaphores")
-    private final ConcurrentMap<String, Boolean> moduleSyncSemaphores;
-
-    private final LcmEventsCmHandleStateHandler lcmEventsCmHandleStateHandler;
+    private static final int MODULE_SYNC_BATCH_SIZE = 100;
+    private static final long PREVENT_CPU_BURN_WAIT_TIME_MILLIS = 10;
+    private static final String VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP = "Started";
 
     /**
      * Execute Cm Handle poll which changes the cm handle state from 'ADVISED' to 'READY'.
      */
-    @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
-    public void executeAdvisedCmHandlePoll() {
-        syncUtils.getAdvisedCmHandles().forEach(advisedCmHandle -> {
-            final String cmHandleId = advisedCmHandle.getId();
-            if (hasPushedIntoSemaphoreMap(cmHandleId)) {
-                log.debug("executing module sync on {}", cmHandleId);
-                final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
-                try {
-                    moduleSyncService.deleteSchemaSetIfExists(advisedCmHandle);
-                    moduleSyncService.syncAndCreateSchemaSetAndAnchor(advisedCmHandle);
-                    lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.READY);
-                    updateModuleSyncSemaphoreMap(cmHandleId);
-                } catch (final Exception e) {
-                    syncUtils.updateLockReasonDetailsAndAttempts(compositeState,
-                            LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, e.getMessage());
-                    setCmHandleStateLocked(advisedCmHandle, compositeState.getLockReason());
-                }
-                log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
-            } else {
-                log.debug("{} already processed by another instance", cmHandleId);
-            }
-        });
-        log.debug("No Cm-Handles currently found in an ADVISED state");
+    @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:5000}")
+    public void moduleSyncAdvisedCmHandles() {
+        populateWorkQueueIfNeeded();
+        while (!moduleSyncWorkQueue.isEmpty()) {
+            final Collection<DataNode> nextBatch = prepareNextBatch();
+            moduleSyncTasks.performModuleSync(nextBatch);
+            preventBusyWait();
+        }
     }
 
     /**
-     * Execute Cm Handle poll which changes the cm handle state from 'LOCKED' to 'ADVISED'.
+     * Find any failed (locked) cm handles and change state back to 'ADVISED'.
      */
     @Scheduled(fixedDelayString = "${timers.locked-modules-sync.sleep-time-ms:300000}")
-    public void executeLockedCmHandlePoll() {
-        final List<YangModelCmHandle> lockedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
-        for (final YangModelCmHandle lockedCmHandle : lockedCmHandles) {
-            final CompositeState compositeState = lockedCmHandle.getCompositeState();
-            final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
-            if (isReadyForRetry) {
-                log.debug("Reset cm handle {} state to ADVISED to re-attempt module-sync", lockedCmHandle.getId());
-                lcmEventsCmHandleStateHandler.updateCmHandleState(lockedCmHandle, CmHandleState.ADVISED);
+    public void resetPreviouslyFailedCmHandles() {
+        final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles();
+        moduleSyncTasks.resetFailedCmHandles(failedCmHandles);
+    }
+
+    private void preventBusyWait() {
+        // This method isn't really needed until CPS-1200 Performance Improvement: Watchdog Parallel execution
+        // but leaving here to minimize impacts on this class for that Jira
+        try {
+            TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void populateWorkQueueIfNeeded() {
+        if (moduleSyncWorkQueue.isEmpty()) {
+            final List<DataNode> advisedCmHandles = syncUtils.getAdvisedCmHandles();
+            for (final DataNode advisedCmHandle : advisedCmHandles) {
+                if (!moduleSyncWorkQueue.offer(advisedCmHandle)) {
+                    log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id"));
+                }
             }
         }
     }
 
-    private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
-            final CompositeState.LockReason lockReason) {
-        advisedCmHandle.getCompositeState().setLockReason(lockReason);
-        lcmEventsCmHandleStateHandler.updateCmHandleState(advisedCmHandle, CmHandleState.LOCKED);
+    private Collection<DataNode> prepareNextBatch() {
+        final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+        final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
+        moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE);
+        log.debug("nextBatchCandidates size : {}", nextBatchCandidates.size());
+        for (final DataNode batchCandidate : nextBatchCandidates) {
+            final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
+            final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP
+                .equals(moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP));
+            if (alreadyAddedToInProgressMap) {
+                log.debug("module sync for {} already in progress by other instance", cmHandleId);
+            } else {
+                nextBatch.add(batchCandidate);
+            }
+        }
+        log.debug("nextBatch size : {}", nextBatch.size());
+        return nextBatch;
     }
 
-    private void updateModuleSyncSemaphoreMap(final String cmHandleId) {
-        moduleSyncSemaphores.replace(cmHandleId, MODEL_SYNC_DONE);
-    }
-
-    private boolean hasPushedIntoSemaphoreMap(final String cmHandleId) {
-        return moduleSyncSemaphores.putIfAbsent(cmHandleId, MODEL_SYNC_IN_PROGRESS) == null;
-    }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
index 16fb8f4..537f501 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
@@ -64,19 +64,14 @@
     private static final Pattern retryAttemptPattern = Pattern.compile("^Attempt #(\\d+) failed:");
 
     /**
-     * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing.
+     * Query data nodes for cm handles with an "ADVISED" cm handle state.
      *
-     * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found
+     * @return cm handles (data nodes) in ADVISED state (empty list if none found)
      */
-    public List<YangModelCmHandle> getAdvisedCmHandles() {
-        final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>(
-                cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED));
-        log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size());
-        if (advisedCmHandlesAsDataNodeList.isEmpty()) {
-            return Collections.emptyList();
-        }
-        Collections.shuffle(advisedCmHandlesAsDataNodeList);
-        return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList);
+    public List<DataNode> getAdvisedCmHandles() {
+        final List<DataNode> advisedCmHandlesAsDataNodes = cmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED);
+        log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size());
+        return advisedCmHandlesAsDataNodes;
     }
 
     /**
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
new file mode 100644
index 0000000..80aa81b
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfigSpec.groovy
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+package org.onap.cps.ncmp.api.impl.config.embeddedcache
+import com.hazelcast.core.Hazelcast
+import org.onap.cps.spi.model.DataNode
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+import java.util.concurrent.BlockingQueue
+
+@SpringBootTest
+@ContextConfiguration(classes = [SynchronizationCacheConfig])
+class SynchronizationCacheConfigSpec extends Specification {
+
+    @Autowired
+    private BlockingQueue<DataNode> moduleSyncWorkQueue
+
+    @Autowired
+    private Map<String, Object> moduleSyncStartedOnCmHandles
+
+    @Autowired
+    private Map<String, Boolean> dataSyncSemaphores
+
+    def 'Embedded (hazelcast) Caches for Module and Data Sync.'() {
+        expect: 'system is able to create an instance of the Module Sync Work Queue'
+            assert null != moduleSyncWorkQueue
+        and: 'system is able to create an instance of a map to hold cm handles which have started (and maybe finished) module sync'
+            assert null != moduleSyncStartedOnCmHandles
+        and: 'system is able to create an instance of a map to hold data sync semaphores'
+            assert null != dataSyncSemaphores
+        and: 'there 3 instances'
+            assert Hazelcast.allHazelcastInstances.size() == 3
+        and: 'they have the correct names (in any order)'
+            assert Hazelcast.allHazelcastInstances.name.containsAll('moduleSyncWorkQueue', 'moduleSyncStartedOnCmHandles', 'dataSyncSemaphores' )
+    }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
deleted file mode 100644
index ea84b44..0000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationSemaphoresConfigSpec.groovy
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * ============LICENSE_START========================================================
- *  Copyright (C) 2022 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.api.impl.config.embeddedcache
-
-import com.hazelcast.core.Hazelcast
-import org.springframework.beans.factory.annotation.Autowired
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.test.context.ContextConfiguration
-import spock.lang.Specification
-
-@SpringBootTest
-@ContextConfiguration(classes = [SynchronizationSemaphoresConfig])
-class SynchronizationSemaphoresConfigSpec extends Specification {
-
-    @Autowired
-    private Map<String, Boolean> moduleSyncSemaphores;
-
-    @Autowired
-    private Map<String, Boolean> dataSyncSemaphores;
-
-    def 'Embedded Sync Semaphores'() {
-        expect: 'system is able to create an instance of ModuleSyncSemaphores'
-            assert null != moduleSyncSemaphores
-        and: 'system is able to create an instance of DataSyncSemaphores'
-            assert null != dataSyncSemaphores
-        and: 'we have 2 instances'
-            assert Hazelcast.allHazelcastInstances.size() == 2
-        and: 'the names match'
-            assert Hazelcast.allHazelcastInstances.name == ['moduleSyncSemaphores', 'dataSyncSemaphores']
-    }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy
index 6a2fbe8..78da7eb 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncServiceSpec.groovy
@@ -24,8 +24,6 @@
 import org.onap.cps.api.CpsModuleService
 import org.onap.cps.ncmp.api.impl.operations.DmiModelOperations
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-import org.onap.cps.ncmp.api.inventory.CmHandleState
-import org.onap.cps.ncmp.api.inventory.CompositeState
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
 import org.onap.cps.spi.CascadeDeleteAllowed
 import org.onap.cps.spi.exceptions.SchemaSetNotFoundException
@@ -34,7 +32,6 @@
 
 class ModuleSyncServiceSpec extends Specification {
 
-
     def mockCpsModuleService = Mock(CpsModuleService)
     def mockDmiModelOperations = Mock(DmiModelOperations)
     def mockCpsAdminService = Mock(CpsAdminService)
@@ -72,38 +69,27 @@
     }
 
     def 'Delete Schema Set for CmHandle' () {
-        given: 'a CmHandle in the advised state'
-            def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED))
-        and: 'the Schema Set exists for the CmHandle'
-            1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id',
-                CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED)
         when: 'delete schema set if exists is called'
-            objectUnderTest.deleteSchemaSetIfExists(cmHandle)
-        then: 'there are no exceptions'
-            noExceptionThrown()
+            objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id')
+        then: 'the module service is invoked to delete the correct schema set'
+            1 * mockCpsModuleService.deleteSchemaSet(expectedDataspaceName, 'some-cmhandle-id', CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED)
     }
 
     def 'Delete a non-existing Schema Set for CmHandle' () {
-        given: 'a CmHandle in the advised state'
-            def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED))
-        and: 'the DB throws an exception because its Schema Set does not exist'
-            1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id',
-                CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') }
+        given: 'the DB throws an exception because its Schema Set does not exist'
+           mockCpsModuleService.deleteSchemaSet(*_) >> { throw new SchemaSetNotFoundException('some-dataspace-name', 'some-cmhandle-id') }
         when: 'delete schema set if exists is called'
-            objectUnderTest.deleteSchemaSetIfExists(cmHandle)
-        then: 'there are no exceptions'
+            objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id')
+        then: 'the exception from the DB is ignored; there are no exceptions'
             noExceptionThrown()
     }
 
     def 'Delete Schema Set for CmHandle with other exception' () {
-        given: 'a CmHandle in the advised state'
-            def cmHandle = new YangModelCmHandle(id: 'some-cmhandle-id', compositeState: new CompositeState(cmHandleState: CmHandleState.ADVISED))
-        and: 'an exception other than SchemaSetNotFoundException is thrown'
+        given: 'an exception other than SchemaSetNotFoundException is thrown'
             UnsupportedOperationException unsupportedOperationException = new UnsupportedOperationException();
-            1 * mockCpsModuleService.deleteSchemaSet(_ as String, 'some-cmhandle-id',
-                CascadeDeleteAllowed.CASCADE_DELETE_ALLOWED) >> { throw unsupportedOperationException }
+            1 * mockCpsModuleService.deleteSchemaSet(*_) >> { throw unsupportedOperationException }
         when: 'delete schema set if exists is called'
-            objectUnderTest.deleteSchemaSetIfExists(cmHandle)
+            objectUnderTest.deleteSchemaSetIfExists('some-cmhandle-id')
         then: 'an exception is thrown'
             def result = thrown(UnsupportedOperationException)
             result == unsupportedOperationException
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
new file mode 100644
index 0000000..291ba96
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -0,0 +1,110 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  Modifications Copyright (C) 2022 Bell Canada
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync
+
+import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler
+import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
+import org.onap.cps.ncmp.api.inventory.CmHandleState
+import org.onap.cps.ncmp.api.inventory.CompositeState
+import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
+import org.onap.cps.ncmp.api.inventory.InventoryPersistence
+import org.onap.cps.ncmp.api.inventory.LockReasonCategory
+import org.onap.cps.spi.model.DataNode
+import spock.lang.Specification
+
+class ModuleSyncTasksSpec extends Specification {
+
+    def mockInventoryPersistence = Mock(InventoryPersistence)
+
+    def mockSyncUtils = Mock(SyncUtils)
+
+    def mockModuleSyncService = Mock(ModuleSyncService)
+
+    def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler)
+
+    def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler)
+
+    def 'Module Sync ADVISED cm handles.'() {
+        given: 'cm handles in an ADVISED state'
+            def cmHandle1 = advisedCmHandleAsDataNode('cm-handle-1')
+            def cmHandle2 = advisedCmHandleAsDataNode('cm-handle-2')
+        and: 'the inventory persistence cm handle returns a ADVISED state for the any handle'
+            mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
+        when: 'module sync poll is executed'
+            objectUnderTest.performModuleSync([cmHandle1, cmHandle2])
+        then: 'module sync service deletes schemas set of each cm handle if it already exists'
+            1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1')
+            1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2')
+        and: 'module sync service is invoked for each cm handle'
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') }
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') }
+        and: 'the state handler is called for the both cm handles'
+            2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY)
+    }
+
+    def 'Module Sync ADVISED cm handle with failure during sync.'() {
+        given: 'a cm handle in an ADVISED state'
+            def cmHandle = advisedCmHandleAsDataNode('cm-handle')
+        and: 'the inventory persistence cm handle returns a ADVISED state for the cm handle'
+            def cmHandleState = new CompositeState(cmHandleState: CmHandleState.ADVISED)
+            1 * mockInventoryPersistence.getCmHandleState('cm-handle') >> cmHandleState
+        and: 'module sync service attempts to sync the cm handle and throws an exception'
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') }
+        when: 'module sync is executed'
+            objectUnderTest.performModuleSync([cmHandle])
+        then: 'update lock reason, details and attempts is invoked'
+            1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception')
+        and: 'the state handler is called to update the state to LOCKED'
+            1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED)
+    }
+
+    def 'Reset failed CM Handles #scenario.'() {
+        given: 'cm handles in an locked state'
+            def lockedState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED)
+                    .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build()
+            def yangModelCmHandle1 = new YangModelCmHandle(id: 'cm-handle-1', compositeState: lockedState)
+            def yangModelCmHandle2 = new YangModelCmHandle(id: 'cm-handle-2', compositeState: lockedState)
+        and: 'sync utils retry locked cm handle returns #isReadyForRetry'
+            mockSyncUtils.isReadyForRetry(lockedState) >>> isReadyForRetry
+        when: 'resetting failed cm handles'
+            objectUnderTest.resetFailedCmHandles([yangModelCmHandle1, yangModelCmHandle2])
+        then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry'
+            expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED)
+        where:
+            scenario                        | isReadyForRetry         || expectedNumberOfInvocationsToSaveCmHandleState
+            'retry locked cm handle once'   | [true, false]           || 1
+            'retry locked cm handle twice'  | [true, true]            || 2
+            'do not retry locked cm handle' | [false, false]          || 0
+    }
+
+    def advisedCmHandleAsDataNode(cmHandleId) {
+        return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED'])
+    }
+
+    def assertYamgModelCmHandleArgument(args, expectedCmHandleId) {
+        {
+            def yangModelCmHandle = args[0]
+            assert yangModelCmHandle.id == expectedCmHandleId
+        }
+        return true
+    }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 81268cb..43f492d 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -21,98 +21,56 @@
 
 package org.onap.cps.ncmp.api.inventory.sync
 
-
-import org.onap.cps.ncmp.api.impl.event.lcm.LcmEventsCmHandleStateHandler
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
-import org.onap.cps.ncmp.api.inventory.CmHandleState
-import org.onap.cps.ncmp.api.inventory.CompositeState
-import org.onap.cps.ncmp.api.inventory.InventoryPersistence
-import org.onap.cps.ncmp.api.inventory.LockReasonCategory
-import org.onap.cps.ncmp.api.inventory.CompositeStateBuilder
-import spock.lang.Specification
 
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.BlockingQueue
+import org.onap.cps.spi.model.DataNode
+import spock.lang.Specification
 
 class ModuleSyncWatchdogSpec extends Specification {
 
-    def mockInventoryPersistence = Mock(InventoryPersistence)
-
     def mockSyncUtils = Mock(SyncUtils)
 
-    def mockModuleSyncService = Mock(ModuleSyncService)
+    def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
 
-    def stubbedMap = Stub(ConcurrentMap)
+    BlockingQueue<DataNode> moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
 
-    def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler)
+    def moduleSyncStartedOnCmHandles = [:]
 
-    def cmHandleState = CmHandleState.ADVISED
+    def mockModuleSyncTasks = Mock(ModuleSyncTasks)
 
-    def objectUnderTest = new ModuleSyncWatchdog(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, stubbedMap as ConcurrentHashMap, mockLcmEventsCmHandleStateHandler)
+    def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks)
 
-    def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handles'() {
-        given: 'cm handles in an advised state and a data sync state'
-            def compositeState1 = new CompositeState(cmHandleState: cmHandleState)
-            def compositeState2 = new CompositeState(cmHandleState: cmHandleState)
-            def yangModelCmHandle1 = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState1)
-            def yangModelCmHandle2 = new YangModelCmHandle(id: 'some-cm-handle-2', compositeState: compositeState2)
-        and: 'sync utilities return a cm handle twice'
-            mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2]
-        when: 'module sync poll is executed'
-            objectUnderTest.executeAdvisedCmHandlePoll()
-        then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
-            1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState1
-        and: 'module sync service deletes schema set of cm handle if it exists'
-            1 * mockModuleSyncService.deleteSchemaSetIfExists(yangModelCmHandle1)
-        and: 'module sync service syncs the first cm handle and creates a schema set'
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle1)
-        then: 'the state handler is called for the first cm handle'
-            1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle1, CmHandleState.READY)
-        and: 'the inventory persistence cm handle returns a composite state for the second cm handle'
-            mockInventoryPersistence.getCmHandleState('some-cm-handle-2') >> compositeState2
-        and: 'module sync service syncs the second cm handle and creates a schema set'
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle2)
-        then: 'the state handler is called for the second cm handle'
-            1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle2, CmHandleState.READY)
-    }
-
-    def 'Schedule a Cm-Handle Sync for ADVISED Cm-Handle with failure'() {
-        given: 'cm handles in an advised state'
-            def compositeState = new CompositeState(cmHandleState: cmHandleState)
-            def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState)
-        and: 'sync utilities return a cm handle'
-            mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle]
-        when: 'module sync poll is executed'
-            objectUnderTest.executeAdvisedCmHandlePoll()
-        then: 'the inventory persistence cm handle returns a composite state for the cm handle'
-            1 * mockInventoryPersistence.getCmHandleState('some-cm-handle') >> compositeState
-        and: 'module sync service attempts to sync the cm handle and throws an exception'
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') }
-        and: 'update lock reason, details and attempts is invoked'
-            1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(compositeState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception')
-        and: 'the state handler is called to update the state to LOCKED'
-            1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.LOCKED)
-    }
-
-    def 'Schedule a Cm-Handle Sync with condition #scenario '() {
-        given: 'cm handles in an locked state'
-            def compositeState = new CompositeStateBuilder().withCmHandleState(CmHandleState.LOCKED)
-                    .withLockReason(LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, '').withLastUpdatedTimeNow().build()
-            def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState)
-        and: 'sync utilities return a cm handle twice'
-            mockSyncUtils.getModuleSyncFailedCmHandles() >> [yangModelCmHandle, yangModelCmHandle]
-        and: 'inventory persistence returns the composite state of the cm handle'
-            mockInventoryPersistence.getCmHandleState(yangModelCmHandle.getId()) >> compositeState
-        and: 'sync utils retry locked cm handle returns #isReadyForRetry'
-            mockSyncUtils.isReadyForRetry(compositeState) >>> isReadyForRetry
-        when: 'module sync poll is executed'
-            objectUnderTest.executeLockedCmHandlePoll()
-        then: 'the first cm handle is updated to state "ADVISED" from "READY"'
-            expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, CmHandleState.ADVISED)
+    def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() {
+        given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
+            mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+        when: ' module sync is started'
+            objectUnderTest.moduleSyncAdvisedCmHandles()
+        then: 'it performs #expectedNumberOfTaskExecutions tasks'
+            expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_)
         where:
-            scenario                        | isReadyForRetry         || expectedNumberOfInvocationsToSaveCmHandleState
-            'retry locked cm handle once'   | [true, false]           || 1
-            'retry locked cm handle twice'  | [true, true]            || 2
-            'do not retry locked cm handle' | [false, false]          || 0
+            scenario              |  numberOfAdvisedCmHandles                                         || expectedNumberOfTaskExecutions
+            'less then 1 batch'   | 1                                                                 || 1
+            'exactly 1 batch'     | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
+            '2 batches'           | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
+            'queue capacity'      | testQueueCapacity                                                 || 3
+            'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+    }
+
+    def 'Reset failed cm handles.'() {
+        given: 'sync utilities returns failed cm handles'
+            def failedCmHandles = [new YangModelCmHandle()]
+            mockSyncUtils.getModuleSyncFailedCmHandles() >> failedCmHandles
+        when: ' reset failed cm handles is started'
+            objectUnderTest.resetPreviouslyFailedCmHandles()
+        then: 'it is delegated to the module sync task (service)'
+            1 * mockModuleSyncTasks.resetFailedCmHandles(failedCmHandles)
+    }
+
+    def createDataNodes(numberOfDataNodes) {
+        def dataNodes = []
+        (1..numberOfDataNodes).each {dataNodes.add(new DataNode())}
+        return dataNodes
     }
 }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
index 52fb110..6ccdcf1 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
@@ -47,8 +47,6 @@
 
 class SyncUtilsSpec extends Specification{
 
-    def mockInventoryPersistence = Mock(InventoryPersistence)
-
     def mockCmHandleQueries = Mock(CmHandleQueries)
 
     def mockDmiDataOperations = Mock(DmiDataOperations)
@@ -63,28 +61,14 @@
     @Shared
     def dataNode = new DataNode(leaves: ['id': 'cm-handle-123'])
 
-    @Shared
-    def dataNodeAdditionalProperties = new DataNode(leaves: ['name': 'dmiProp1', 'value': 'dmiValue1'])
-
 
     def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
         given: 'the inventory persistence service returns a collection of data nodes'
             mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
-        and: 'we have some additional (dmi, private) properties'
-            dataNodeAdditionalProperties.xpath = dataNode.xpath + '/additional-properties[@name="dmiProp1"]'
-            dataNode.childDataNodes = [dataNodeAdditionalProperties]
         when: 'get advised cm handles are fetched'
             def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles()
         then: 'the returned data node collection is the correct size'
             yangModelCmHandles.size() == expectedDataNodeSize
-        and: 'if there is a data node the additional (dmi, private) properties are included'
-            if (expectedDataNodeSize > 0) {
-                assert yangModelCmHandles[0].dmiProperties[0].name == 'dmiProp1'
-                assert yangModelCmHandles[0].dmiProperties[0].value == 'dmiValue1'
-            }
-        and: 'yang model collection contains the correct data'
-            yangModelCmHandles.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) ==
-                dataNodeCollection.stream().map(dataNode -> dataNode.leaves.get("id")).collect(Collectors.toSet())
         where: 'the following scenarios are used'
             scenario         | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
             'exists'         | [dataNode]         || 1                                   | 1
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy
new file mode 100644
index 0000000..20d384f
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/YangDataConverterSpec.groovy
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START========================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.utils
+
+import org.onap.cps.ncmp.api.impl.utils.YangDataConverter
+import org.onap.cps.spi.model.DataNode
+import spock.lang.Specification
+
+class YangDataConverterSpec extends Specification{
+
+    def 'Convert a cm handle data node with private properties.'() {
+        given: 'a datanode with some additional (dmi, private) properties'
+            def dataNodeAdditionalProperties = new DataNode(xpath:'/additional-properties[@name="dmiProp1"]',
+                leaves: ['name': 'dmiProp1', 'value': 'dmiValue1'])
+            def dataNode = new DataNode(childDataNodes:[dataNodeAdditionalProperties])
+        when: 'the dataNode is converted'
+            def yangModelCmHandle = YangDataConverter.convertCmHandleToYangModel(dataNode,'sample-id')
+        then: 'the converted object has the correct id'
+            assert yangModelCmHandle.id == 'sample-id'
+        and: 'the additional (dmi, private) properties are included'
+            assert yangModelCmHandle.dmiProperties[0].name == 'dmiProp1'
+            assert yangModelCmHandle.dmiProperties[0].value == 'dmiValue1'
+    }
+}
diff --git a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java
index d80306b..c77daaf 100644
--- a/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java
+++ b/cps-service/src/main/java/org/onap/cps/spi/model/DataNode.java
@@ -22,6 +22,7 @@
 
 package org.onap.cps.spi.model;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -33,7 +34,9 @@
 @Setter(AccessLevel.PROTECTED)
 @Getter
 @EqualsAndHashCode
-public class DataNode {
+public class DataNode implements Serializable {
+
+    private static final long serialVersionUID = 1482619410918597467L;
 
     DataNode() {    }