CmHandle creation performance degradation

- Created a dedicated threadpool for scheduler.
- Tuned async threadpool of notification executor from setting RejectedExecutionHandler and application.yml.

Issue-ID: CPS-1126
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Change-Id: I2afe3c76c1aec78751777df0d2f08ddb8dcee102
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index def006c..dd83795 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -100,8 +100,8 @@
             enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""}

     async:

         executor:

-            core-pool-size: 2

-            max-pool-size: 10

+            core-pool-size: 10

+            max-pool-size: 100

             queue-capacity: 500

             wait-for-tasks-to-complete-on-shutdown: true

             thread-name-prefix: Async-

diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java
index af33651..a418155 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/NcmpConfiguration.java
@@ -32,11 +32,9 @@
 import org.springframework.context.annotation.Scope;
 import org.springframework.http.MediaType;
 import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
-import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.stereotype.Component;
 import org.springframework.web.client.RestTemplate;
 
-@EnableScheduling
 @Configuration
 @RequiredArgsConstructor(access = AccessLevel.PROTECTED)
 public class NcmpConfiguration {
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
index f18d843..3f81194 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java
@@ -54,8 +54,7 @@
      */
     @Scheduled(fixedDelayString = "${timers.advised-modules-sync.sleep-time-ms:30000}")
     public void executeAdvisedCmHandlePoll() {
-        YangModelCmHandle advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
-        while (advisedCmHandle != null) {
+        syncUtils.getAdvisedCmHandles().stream().forEach(advisedCmHandle -> {
             final String cmHandleId = advisedCmHandle.getId();
             final CompositeState compositeState = inventoryPersistence.getCmHandleState(cmHandleId);
             try {
@@ -69,8 +68,7 @@
             }
             inventoryPersistence.saveCmHandleState(cmHandleId, compositeState);
             log.debug("{} is now in {} state", cmHandleId, compositeState.getCmHandleState().name());
-            advisedCmHandle = syncUtils.getAnAdvisedCmHandle();
-        }
+        });
         log.debug("No Cm-Handles currently found in an ADVISED state");
     }
 
@@ -85,7 +83,7 @@
             final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState);
             if (isReadyForRetry) {
                 setCompositeStateToAdvisedAndRetainOldLockReasonDetails(compositeState);
-                log.debug("Locked cm handle {} is being resynced", lockedCmHandle.getId());
+                log.debug("Locked cm handle {} is being re-synced", lockedCmHandle.getId());
                 inventoryPersistence.saveCmHandleState(lockedCmHandle.getId(), compositeState);
             }
         }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
index 2b80b9d..467fd8f 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/SyncUtils.java
@@ -22,10 +22,10 @@
 package org.onap.cps.ncmp.api.inventory.sync;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import java.security.SecureRandom;
 import java.time.Duration;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -55,9 +55,6 @@
 @Service
 @RequiredArgsConstructor
 public class SyncUtils {
-
-    private static final SecureRandom secureRandom = new SecureRandom();
-
     private final InventoryPersistence inventoryPersistence;
 
     private final DmiDataOperations dmiDataOperations;
@@ -69,17 +66,17 @@
     /**
      * Query data nodes for cm handles with an "ADVISED" cm handle state, and select a random entry for processing.
      *
-     * @return a random yang model cm handle with an ADVISED state, return null if not found
+     * @return a randomized yang model cm handle list with ADVISED state, return empty list if not found
      */
-    public YangModelCmHandle getAnAdvisedCmHandle() {
-        final List<DataNode> advisedCmHandles = inventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED);
-        if (advisedCmHandles.isEmpty()) {
-            return null;
+    public List<YangModelCmHandle> getAdvisedCmHandles() {
+        final List<DataNode> advisedCmHandlesAsDataNodeList = new ArrayList<>(
+                inventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED));
+        log.info("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodeList.size());
+        if (advisedCmHandlesAsDataNodeList.isEmpty()) {
+            return Collections.emptyList();
         }
-        final int randomElementIndex = secureRandom.nextInt(advisedCmHandles.size());
-        final String cmHandleId = advisedCmHandles.get(randomElementIndex).getLeaves()
-            .get("id").toString();
-        return inventoryPersistence.getYangModelCmHandle(cmHandleId);
+        Collections.shuffle(advisedCmHandlesAsDataNodeList);
+        return convertCmHandlesDataNodesToYangModelCmHandles(advisedCmHandlesAsDataNodeList);
     }
 
     /**
@@ -113,12 +110,10 @@
      * @return a random LOCKED yang model cm handle, return null if not found
      */
     public List<YangModelCmHandle> getModuleSyncFailedCmHandles() {
-        final List<DataNode> lockedCmHandleAsDataNodeList = inventoryPersistence.getCmHandleDataNodesByCpsPath(
+        final List<DataNode> lockedCmHandlesAsDataNodeList = inventoryPersistence.getCmHandleDataNodesByCpsPath(
             "//lock-reason[@reason=\"LOCKED_MODULE_SYNC_FAILED\"]/ancestor::cm-handles",
             FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS);
-        return lockedCmHandleAsDataNodeList.stream()
-            .map(cmHandle -> YangDataConverter.convertCmHandleToYangModel(cmHandle,
-                cmHandle.getLeaves().get("id").toString())).collect(Collectors.toList());
+        return convertCmHandlesDataNodesToYangModelCmHandles(lockedCmHandlesAsDataNodeList);
     }
 
     /**
@@ -191,4 +186,10 @@
         final Map.Entry<String, JsonNode> firstElement = overallJsonTreeMap.next();
         return jsonObjectMapper.asJsonString(Map.of(firstElement.getKey(), firstElement.getValue()));
     }
+
+    private List<YangModelCmHandle> convertCmHandlesDataNodesToYangModelCmHandles(
+            final List<DataNode> cmHandlesAsDataNodeList) {
+        return cmHandlesAsDataNodeList.stream().map(dataNode -> YangDataConverter.convertCmHandleToYangModel(dataNode,
+                dataNode.getLeaves().get("id").toString())).collect(Collectors.toList());
+    }
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java
new file mode 100644
index 0000000..196a655
--- /dev/null
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurer.java
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2022 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.config;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+
+@Configuration
+@EnableScheduling
+public class WatchdogSchedulingConfigurer implements SchedulingConfigurer {
+
+    @Override
+    public void configureTasks(final ScheduledTaskRegistrar scheduledTaskRegistrar) {
+        scheduledTaskRegistrar.setTaskScheduler(taskScheduler());
+    }
+
+    /**
+     * Implementation of Spring's {@link TaskScheduler} interface, wrapping
+     * a native {@link org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler} for watchdogs.
+     */
+    @Bean
+    public TaskScheduler taskScheduler() {
+        final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+        taskScheduler.setPoolSize(10);
+        taskScheduler.setThreadNamePrefix("watchdog-th-");
+        taskScheduler.setAwaitTerminationSeconds(60);
+        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
+        taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+        taskScheduler.initialize();
+        return taskScheduler;
+    }
+}
\ No newline at end of file
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
index 4b92be3..40a0e39 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdogSpec.groovy
@@ -50,7 +50,7 @@
             def yangModelCmHandle2 = new YangModelCmHandle(id: 'some-cm-handle-2', compositeState: compositeState2)
             objectUnderTest.isGlobalDataSyncCacheEnabled = dataSyncCacheEnabled
         and: 'sync utilities return a cm handle twice'
-            mockSyncUtils.getAnAdvisedCmHandle() >>> [yangModelCmHandle1, yangModelCmHandle2, null]
+            mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle1, yangModelCmHandle2]
         when: 'module sync poll is executed'
             objectUnderTest.executeAdvisedCmHandlePoll()
         then: 'the inventory persistence cm handle returns a composite state for the first cm handle'
@@ -84,7 +84,7 @@
             def compositeState = new CompositeState(cmHandleState: cmHandleState)
             def yangModelCmHandle = new YangModelCmHandle(id: 'some-cm-handle', compositeState: compositeState)
         and: 'sync utilities return a cm handle'
-            mockSyncUtils.getAnAdvisedCmHandle() >>> [yangModelCmHandle, null]
+            mockSyncUtils.getAdvisedCmHandles() >> [yangModelCmHandle]
         when: 'module sync poll is executed'
             objectUnderTest.executeAdvisedCmHandlePoll()
         then: 'the inventory persistence cm handle returns a composite state for the cm handle'
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
index 134ee38..6c2d8f1 100644
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/SyncUtilsSpec.groovy
@@ -41,6 +41,7 @@
 
 import java.time.OffsetDateTime
 import java.time.format.DateTimeFormatter
+import java.util.stream.Collectors
 
 class SyncUtilsSpec extends Specification{
 
@@ -61,17 +62,17 @@
     def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
         given: 'the inventory persistence service returns a collection of data nodes'
             mockInventoryPersistence.getCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
-        when: 'get advised cm handle is called'
-            objectUnderTest.getAnAdvisedCmHandle()
+        when: 'get advised cm handles are fetched'
+            def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles()
         then: 'the returned data node collection is the correct size'
-            dataNodeCollection.size() == expectedDataNodeSize
-        and: 'get yang model cm handles is invoked the correct number of times'
-           expectedCallsToGetYangModelCmHandle * mockInventoryPersistence.getYangModelCmHandle('cm-handle-123')
+            yangModelCmHandles.size() == expectedDataNodeSize
+        and: 'yang model collection contains the correct data'
+            yangModelCmHandles.stream().map(yangModel -> yangModel.id).collect(Collectors.toSet()) ==
+                    dataNodeCollection.stream().map(dataNode -> dataNode.leaves.get("id")).collect(Collectors.toSet())
         where: 'the following scenarios are used'
             scenario         | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
-            'exists'         | [ dataNode ]       || 1                                   | 1
-            'does not exist' | [ ]                || 0                                   | 0
-
+            'exists'         | [dataNode]         || 1                                   | 1
+            'does not exist' | []                 || 0                                   | 0
     }
 
     def 'Update Lock Reason, Details and Attempts where lock reason #scenario'() {
@@ -120,7 +121,7 @@
         given: 'the inventory persistence service returns a collection of data nodes'
             mockInventoryPersistence.getCmHandlesByOperationalSyncState(DataStoreSyncState.UNSYNCHRONIZED) >> unSynchronizedDataNodes
             mockInventoryPersistence.getCmHandlesByIdAndState("cm-handle-123", CmHandleState.READY) >> readyDataNodes
-        when: 'get advised cm handle is called'
+        when: 'get advised cm handles are fetched'
             objectUnderTest.getAnUnSynchronizedReadyCmHandle()
         then: 'the returned data node collection is the correct size'
             readyDataNodes.size() == expectedDataNodeSize
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy
new file mode 100644
index 0000000..d4010aa
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/inventory/sync/config/WatchdogSchedulingConfigurerSpec.groovy
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.inventory.sync.config
+
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.BeforeEach
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.context.ConfigurableApplicationContext
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+
+@SpringBootTest
+@ContextConfiguration(classes = [ConfigurableApplicationContext, WatchdogSchedulingConfigurer])
+class WatchdogSchedulingConfigurerSpec extends Specification {
+
+    @Autowired
+    private ConfigurableApplicationContext applicationContext;
+
+    def watchdogSchedulingConfigurer;
+
+    @BeforeEach
+    void setup() {
+        watchdogSchedulingConfigurer = (WatchdogSchedulingConfigurer) applicationContext.getBean("watchdogSchedulingConfigurer")
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (applicationContext != null) {
+            applicationContext.close()
+        }
+    }
+
+    def 'Validate watchdog scheduling configuration'() {
+        given: 'task scheduler configuration properties are loaded as map'
+            def linkedHashMap = watchdogSchedulingConfigurer.taskScheduler().getProperties()
+        expect: 'thread name prefix is mapped correctly'
+            assert linkedHashMap.'threadNamePrefix' == 'watchdog-th-'
+    }
+}
diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
index 2d8f7fb..9327c53 100644
--- a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
+++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
@@ -21,6 +21,7 @@
 
 package org.onap.cps.config;
 
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.validation.constraints.Min;
 import lombok.Setter;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -61,7 +62,10 @@
         executor.setMaxPoolSize(maxPoolSize);
         executor.setQueueCapacity(queueCapacity);
         executor.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
+        executor.setKeepAliveSeconds(60);
         executor.setThreadNamePrefix(threadNamePrefix);
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+        executor.initialize();
         return executor;
     }