Performance Improvement: Watchdog Parallel execution with configuration

- Introduced AsyncSyncExecutor to get task and execute it with
  configured number of parallel threads.
- Number of parallel thread can be configured from application.yml.
- AsyncTaskExecutorSpec is added
- Fixed existing grovvy test now async task would be submitted.

Issue-ID: CPS-1200
Change-Id: I58c0368b945c90e619c2acfc7458ba58de047484
Signed-off-by: ToineSiebelink <>
Signed-off-by: sourabh_sourabh <>
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
index 291ba96..a233996 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasksSpec.groovy
@@ -30,6 +30,7 @@
 import org.onap.cps.ncmp.api.inventory.LockReasonCategory
 import org.onap.cps.spi.model.DataNode
 import spock.lang.Specification
+import java.util.concurrent.atomic.AtomicInteger
 class ModuleSyncTasksSpec extends Specification {
@@ -41,6 +42,8 @@
     def mockLcmEventsCmHandleStateHandler = Mock(LcmEventsCmHandleStateHandler)
+    def batchCount = new AtomicInteger(5)
     def objectUnderTest = new ModuleSyncTasks(mockInventoryPersistence, mockSyncUtils, mockModuleSyncService, mockLcmEventsCmHandleStateHandler)
     def 'Module Sync ADVISED cm handles.'() {
@@ -50,15 +53,17 @@
         and: 'the inventory persistence cm handle returns a ADVISED state for the any handle'
             mockInventoryPersistence.getCmHandleState(_) >> new CompositeState(cmHandleState: CmHandleState.ADVISED)
         when: 'module sync poll is executed'
-            objectUnderTest.performModuleSync([cmHandle1, cmHandle2])
+            objectUnderTest.performModuleSync([cmHandle1, cmHandle2], batchCount)
         then: 'module sync service deletes schemas set of each cm handle if it already exists'
             1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-1')
             1 * mockModuleSyncService.deleteSchemaSetIfExists('cm-handle-2')
         and: 'module sync service is invoked for each cm handle'
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-1') }
-            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args,'cm-handle-2') }
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-1') }
+            1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(_) >> { args -> assertYamgModelCmHandleArgument(args, 'cm-handle-2') }
         and: 'the state handler is called for the both cm handles'
             2 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.READY)
+        and: 'batch count is decremented by one'
+            assert batchCount.get() == 4
     def 'Module Sync ADVISED cm handle with failure during sync.'() {
@@ -70,11 +75,13 @@
         and: 'module sync service attempts to sync the cm handle and throws an exception'
             1 * mockModuleSyncService.syncAndCreateSchemaSetAndAnchor(*_) >> { throw new Exception('some exception') }
         when: 'module sync is executed'
-            objectUnderTest.performModuleSync([cmHandle])
+            objectUnderTest.performModuleSync([cmHandle], batchCount)
         then: 'update lock reason, details and attempts is invoked'
-            1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED ,'some exception')
+            1 * mockSyncUtils.updateLockReasonDetailsAndAttempts(cmHandleState, LockReasonCategory.LOCKED_MODULE_SYNC_FAILED, 'some exception')
         and: 'the state handler is called to update the state to LOCKED'
             1 * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.LOCKED)
+        and: 'batch count is decremented by one'
+            assert batchCount.get() == 4
     def 'Reset failed CM Handles #scenario.'() {
@@ -90,14 +97,14 @@
         then: 'updated to state "ADVISED" from "READY" is called as often as there are cm handles ready for retry'
             expectedNumberOfInvocationsToSaveCmHandleState * mockLcmEventsCmHandleStateHandler.updateCmHandleState(_, CmHandleState.ADVISED)
-            scenario                        | isReadyForRetry         || expectedNumberOfInvocationsToSaveCmHandleState
-            'retry locked cm handle once'   | [true, false]           || 1
-            'retry locked cm handle twice'  | [true, true]            || 2
-            'do not retry locked cm handle' | [false, false]          || 0
+            scenario                        | isReadyForRetry || expectedNumberOfInvocationsToSaveCmHandleState
+            'retry locked cm handle once'   | [true, false]   || 1
+            'retry locked cm handle twice'  | [true, true]    || 2
+            'do not retry locked cm handle' | [false, false]  || 0
     def advisedCmHandleAsDataNode(cmHandleId) {
-        return new DataNode(anchorName:cmHandleId, leaves:['id':cmHandleId, 'cm-handle-state':'ADVISED'])
+        return new DataNode(anchorName: cmHandleId, leaves: ['id': cmHandleId, 'cm-handle-state': 'ADVISED'])
     def assertYamgModelCmHandleArgument(args, expectedCmHandleId) {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 43f492d..e5240c0 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -22,7 +22,7 @@
 package org.onap.cps.ncmp.api.inventory.sync
 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle
+import org.onap.cps.ncmp.api.inventory.sync.executor.AsyncTaskExecutor
 import java.util.concurrent.ArrayBlockingQueue
 import java.util.concurrent.BlockingQueue
 import org.onap.cps.spi.model.DataNode
@@ -34,28 +34,38 @@
     def static testQueueCapacity = 50 + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE
-    BlockingQueue<DataNode> moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
+    def moduleSyncWorkQueue = new ArrayBlockingQueue(testQueueCapacity)
     def moduleSyncStartedOnCmHandles = [:]
     def mockModuleSyncTasks = Mock(ModuleSyncTasks)
-    def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles, mockModuleSyncTasks)
+    def spiedAsyncTaskExecutor = Spy(AsyncTaskExecutor)
-    def 'Module sync #scenario , #numberOfAdvisedCmHandles advised cm handles.'() {
+    def objectUnderTest = new ModuleSyncWatchdog(mockSyncUtils, moduleSyncWorkQueue , moduleSyncStartedOnCmHandles,
+            mockModuleSyncTasks, spiedAsyncTaskExecutor)
+    void setup() {
+        spiedAsyncTaskExecutor.setupThreadPool();
+    }
+    def 'Module sync advised cm handles with #scenario.'() {
         given: 'sync utilities returns #numberOfAdvisedCmHandles advised cm handles'
             mockSyncUtils.getAdvisedCmHandles() >> createDataNodes(numberOfAdvisedCmHandles)
+        and: 'the executor has #parallelismLevel available threads'
+            spiedAsyncTaskExecutor.getAsyncTaskParallelismLevel() >> parallelismLevel
         when: ' module sync is started'
         then: 'it performs #expectedNumberOfTaskExecutions tasks'
-            expectedNumberOfTaskExecutions * mockModuleSyncTasks.performModuleSync(_)
-        where:
-            scenario              |  numberOfAdvisedCmHandles                                         || expectedNumberOfTaskExecutions
-            'less then 1 batch'   | 1                                                                 || 1
-            'exactly 1 batch'     | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
-            '2 batches'           | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
-            'queue capacity'      | testQueueCapacity                                                 || 3
-            'over queue capacity' | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+            expectedNumberOfTaskExecutions * spiedAsyncTaskExecutor.executeTask(*_)
+        where: ' the following parameter are used'
+            scenario              | parallelismLevel | numberOfAdvisedCmHandles                                          || expectedNumberOfTaskExecutions
+            'less then 1 batch'   | 9                | 1                                                                 || 1
+            'exactly 1 batch'     | 9                | ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                         || 1
+            '2 batches'           | 9                | 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE                     || 2
+            'queue capacity'      | 9                | testQueueCapacity                                                 || 3
+            'over queue capacity' | 9                | testQueueCapacity + 2 * ModuleSyncWatchdog.MODULE_SYNC_BATCH_SIZE || 3
+            'not enough threads'  | 2                | testQueueCapacity                                                 || 2
     def 'Reset failed cm handles.'() {
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy
new file mode 100644
index 0000000..ba1820e
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/executor/AsyncTaskExecutorSpec.groovy
@@ -0,0 +1,61 @@
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+package org.onap.cps.ncmp.api.inventory.sync.executor
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+import java.util.concurrent.TimeoutException
+import java.util.function.Supplier
+@SpringBootTest(classes = AsyncTaskExecutor)
+class AsyncTaskExecutorSpec extends Specification {
+    @Autowired
+    AsyncTaskExecutor objectUnderTest
+    def mockTaskSupplier = Mock(Supplier<Object>)
+    def 'Parallelism level configuration.'() {
+        expect: 'Parallelism level is configured with the correct value'
+            assert objectUnderTest.getAsyncTaskParallelismLevel() == 3
+    }
+    def 'Task completion with #caseDescriptor.'() {
+        when: 'task completion is handled'
+            def irrelevantResponse = null
+            objectUnderTest.handleTaskCompletion(irrelevantResponse, exception);
+        then: 'any exception is swallowed by the task completion (logged)'
+            noExceptionThrown()
+        where: 'following cases are tested'
+            caseDescriptor         | exception
+            'no exception'         | null
+            'time out exception'   | new TimeoutException("time-out")
+            'unexpected exception' | new Exception("some exception")
+    }
+    def 'Task execution.'() {
+        when: 'a task is submitted for execution'
+            objectUnderTest.executeTask(() -> mockTaskSupplier, 0)
+        then: 'the task submission is successful'
+            noExceptionThrown()
+    }
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index c23926e..03d70c26 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -23,3 +23,6 @@
         base-path: dmi
+    async-executor:
+        parallelism-level: 3
\ No newline at end of file