Fix ExecutionQueue cannot be cleaned

Fix that LCM Requests in Execution Queue continue after Stop OAM
request sent.
Fix that Appc-Ansible bundle cannot be fully stopped because
of NPE.

Issue-Id: APPC-159
Change-Id: I8f0a3a79a5c572ad84e66f71b4ddb47118704302
Signed-off-by: Hao Kuang <Hao.Kuang@amdocs.com>
diff --git a/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java b/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java
index fa81ef7..865841d 100644
--- a/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java
+++ b/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java
@@ -38,9 +38,6 @@
 
 /**
  * This activator is used to initialize and terminate an instance of AnsibleAdapter class
- * 
- * Author : Ashwin Sridharan
- * Date   : Oct 2016
  */
 public class AnsibleActivator implements BundleActivator {
 
@@ -57,12 +54,12 @@
     /**
      * The logger to be used
      */
-    private static final EELFLogger logger = EELFManager.getInstance().getLogger(AnsibleActivator.class);
+    private final EELFLogger logger = EELFManager.getInstance().getLogger(AnsibleActivator.class);
 
     /**
      * The configuration object used to configure this bundle
      */
-    private Configuration configuration;
+    private final Configuration configuration = ConfigurationFactory.getConfiguration();
 
     /**
      * Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start
@@ -70,26 +67,24 @@
      * <p>
      * This method must complete and return to its caller in a timely manner.
      * </p>
-     * 
-     * @param context
-     *            The execution context of the bundle being started.
-     * @throws java.lang.Exception
-     *             If this method throws an exception, this bundle is marked as stopped and the Framework will remove
-     *             this bundle's listeners, unregister all services registered by this bundle, and release all services
-     *             used by this bundle.
+     *
+     * @param context The execution context of the bundle being started.
+     * @throws java.lang.Exception If this method throws an exception, this bundle is marked as stopped and the
+     *                             Framework will remove this bundle's listeners, unregister all services registered
+     *                             by this bundle, and release all services used by this bundle.
      * @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
      */
     @Override
     public void start(BundleContext context) throws Exception {
 
         logger.info("Starting bundle " + getName());
-	String appName = "APPC: ";
+        String appName = "APPC: ";
         logger.info(Msg.COMPONENT_INITIALIZING, appName, "Ansible Adapter");
-	adapter = new AnsibleAdapterImpl();
-	
+        adapter = new AnsibleAdapterImpl();
+
         if (registration == null) {
             logger.info(Msg.REGISTERING_SERVICE, appName, adapter.getAdapterName(),
-			AnsibleAdapter.class.getSimpleName());
+                AnsibleAdapter.class.getSimpleName());
             registration = context.registerService(AnsibleAdapter.class, adapter, null);
         }
 
@@ -104,13 +99,11 @@
      * <p>
      * This method must complete and return to its caller in a timely manner.
      * </p>
-     * 
-     * @param context
-     *            The execution context of the bundle being stopped.
-     * @throws java.lang.Exception
-     *             If this method throws an exception, the bundle is still marked as stopped, and the Framework will
-     *             remove the bundle's listeners, unregister all services registered by the bundle, and release all
-     *             services used by the bundle. *
+     *
+     * @param context The execution context of the bundle being stopped.
+     * @throws java.lang.Exception If this method throws an exception, the bundle is still marked as stopped, and the
+     *                             Framework will remove the bundle's listeners, unregister all services registered
+     *                             by the bundle, and release all services used by the bundle.
      * @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
      */
     @Override
@@ -130,5 +123,4 @@
     public String getName() {
         return "APPC Ansible Adapter";
     }
-
 }
diff --git a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java
index 5054d34..f7ffdad 100644
--- a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java
+++ b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java
@@ -28,45 +28,50 @@
 package org.openecomp.appc.executor.impl;
 
 
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.apache.commons.lang.ObjectUtils;
+import org.openecomp.appc.domainmodel.lcm.RuntimeContext;
+import org.openecomp.appc.exceptions.APPCException;
+import org.openecomp.appc.executionqueue.ExecutionQueueService;
+import org.openecomp.appc.executor.CommandExecutor;
+
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang.ObjectUtils;
-import org.openecomp.appc.domainmodel.lcm.RuntimeContext;
-import org.openecomp.appc.domainmodel.lcm.ActionLevel;
-import org.openecomp.appc.exceptions.APPCException;
-import org.openecomp.appc.executionqueue.ExecutionQueueService;
-import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory;
-import org.openecomp.appc.executor.CommandExecutor;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
 
 public class CommandExecutorImpl implements CommandExecutor {
 
-    private CommandTaskFactory executionTaskFactory ;
+    private CommandTaskFactory executionTaskFactory;
     private static final EELFLogger logger = EELFManager.getInstance().getLogger(CommandExecutorImpl.class);
 
     private ExecutionQueueService executionQueueService;
     private ExpiredMessageHandler expiredMessageHandler;
 
-    public CommandExecutorImpl(){
+    public CommandExecutorImpl() {
 
     }
 
+    /**
+     * Injected by blueprint
+     *
+     * @param executionQueueService
+     */
     public void setExecutionQueueService(ExecutionQueueService executionQueueService) {
         this.executionQueueService = executionQueueService;
     }
 
+    /**
+     * Injected by blueprint
+     * @param expiredMessageHandler
+     */
     public void setExpiredMessageHandler(ExpiredMessageHandler expiredMessageHandler) {
         this.expiredMessageHandler = expiredMessageHandler;
     }
 
     public void initialize() {
         logger.info("initialization started of CommandExecutorImpl");
-        executionQueueService = ExecutionQueueServiceFactory.getExecutionQueueService();
         executionQueueService.registerMessageExpirationListener(expiredMessageHandler);
     }
 
@@ -77,13 +82,14 @@
     /**
      * Execute given command
      * Create command request and enqueue it for execution.
+     *
      * @param commandExecutorInput Contains CommandHeader,  command , target Id , payload and conf ID (optional)
      * @throws APPCException in case of error.
      */
     @Override
-    public void executeCommand (RuntimeContext commandExecutorInput) throws APPCException{
+    public void executeCommand(RuntimeContext commandExecutorInput) throws APPCException {
         if (logger.isTraceEnabled()) {
-            logger.trace("Entering to executeCommand with CommandExecutorInput = "+ ObjectUtils.toString(commandExecutorInput));
+            logger.trace("Entering to executeCommand with CommandExecutorInput = " + ObjectUtils.toString(commandExecutorInput));
         }
         enqueRequest(commandExecutorInput);
         if (logger.isTraceEnabled()) {
@@ -91,30 +97,31 @@
         }
     }
 
-    private RuntimeContext getCommandRequest(RuntimeContext commandExecutorInput){
+    private RuntimeContext getCommandRequest(RuntimeContext commandExecutorInput) {
         if (logger.isTraceEnabled()) {
-            logger.trace("Entering to getCommandRequest with CommandExecutorInput = "+ ObjectUtils.toString(commandExecutorInput));
+            logger.trace("Entering to getCommandRequest with CommandExecutorInput = " + ObjectUtils.toString(commandExecutorInput));
         }
         RuntimeContext commandRequest;
         commandRequest = commandExecutorInput;
         if (logger.isTraceEnabled()) {
-            logger.trace("Exiting from getCommandRequest with (CommandRequest = "+ ObjectUtils.toString(commandRequest)+")");
+            logger.trace("Exiting from getCommandRequest with (CommandRequest = " + ObjectUtils.toString(commandRequest) + ")");
         }
         return commandRequest;
     }
 
     @SuppressWarnings("unchecked")
-    private void enqueRequest(RuntimeContext request) throws APPCException{
+    private void enqueRequest(RuntimeContext request) throws APPCException {
         if (logger.isTraceEnabled()) {
-            logger.trace("Entering to enqueRequest with CommandRequest = "+ ObjectUtils.toString(request));
+            logger.trace("Entering to enqueRequest with CommandRequest = " + ObjectUtils.toString(request));
         }
         try {
-            String action = request.getRequestContext().getAction().name();
             CommandTask commandTask = executionTaskFactory.getExecutionTask(request);
+
             long remainingTTL = getRemainingTTL(request);
-            executionQueueService.putMessage(commandTask,remainingTTL, TimeUnit.MILLISECONDS);
+
+            executionQueueService.putMessage(commandTask, remainingTTL, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
-            logger.error("Exception: "+e.getMessage());
+            logger.error("Exception: " + e.getMessage());
             throw new APPCException(e);
         }
 
@@ -129,9 +136,9 @@
         return ChronoUnit.MILLIS.between(Instant.now(), requestTimestamp.plusSeconds(ttl));
     }
 
-    private CommandTask getMessageExecutor(RuntimeContext request){
+    private CommandTask getMessageExecutor(RuntimeContext request) {
         if (logger.isTraceEnabled()) {
-            logger.trace("Entering to getMessageExecutor with command = "+ request);
+            logger.trace("Entering to getMessageExecutor with command = " + request);
         }
         CommandTask executionTask = executionTaskFactory.getExecutionTask(request);
         if (logger.isTraceEnabled()) {
@@ -139,6 +146,4 @@
         }
         return executionTask;
     }
-
-
 }
diff --git a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f84e972..5474dcc 100644
--- a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -31,27 +31,36 @@
            xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
     <bean id="commandExecutorBean" class="org.openecomp.appc.executor.impl.CommandExecutorImpl"
           scope="singleton" init-method="initialize">
+        <property name="executionQueueService" ref="ExecutionQueueServiceRef"/>
         <property name="executionTaskFactory" ref="CommandExecutionTaskFactory"/>
         <property name="expiredMessageHandler" ref="expiredMessageHandlerBean"/>
     </bean>
 
-    <bean id="CommandExecutionTaskFactory" class="org.openecomp.appc.executor.impl.CommandTaskFactory" scope="singleton" >
-        <property name="vnfRequestHandler" ref="vnfRequestHandlerService" />
+    <bean id="CommandExecutionTaskFactory" class="org.openecomp.appc.executor.impl.CommandTaskFactory"
+          scope="singleton">
+        <property name="vnfRequestHandler" ref="vnfRequestHandlerService"/>
         <property name="vmRequestHandler" ref="vmRequestHandlerService"/>
-        <property name="workflowManager" ref="WorkFlowManagerRef" />
-        <property name="lifecyclemanager" ref="LifecyclemanagerRef" />
+        <property name="workflowManager" ref="WorkFlowManagerRef"/>
+        <property name="lifecyclemanager" ref="LifecyclemanagerRef"/>
     </bean>
 
     <bean id="expiredMessageHandlerBean" class="org.openecomp.appc.executor.impl.ExpiredMessageHandler"
           scope="singleton">
-        <property name="vnfRequestHandler" ref="vnfRequestHandlerService" />
+        <property name="vnfRequestHandler" ref="vnfRequestHandlerService"/>
         <property name="vmRequestHandler" ref="vmRequestHandlerService"/>
     </bean>
 
-	<reference id="WorkFlowManagerRef" availability="mandatory" activation="eager" interface="org.openecomp.appc.workflow.WorkFlowManager" />
-	<reference id="vnfRequestHandlerService" availability="optional" activation="eager" interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VNF)" />
-    <reference id="vmRequestHandlerService" availability="optional" activation="eager" interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VM)" />
-    <reference id="LifecyclemanagerRef" availability="mandatory" activation="eager" interface="org.openecomp.appc.lifecyclemanager.LifecycleManager" />
+    <reference id="WorkFlowManagerRef" availability="mandatory" activation="eager"
+               interface="org.openecomp.appc.workflow.WorkFlowManager"/>
+    <reference id="vnfRequestHandlerService" availability="optional" activation="eager"
+               interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VNF)"/>
+    <reference id="vmRequestHandlerService" availability="optional" activation="eager"
+               interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VM)"/>
+    <reference id="LifecyclemanagerRef" availability="mandatory" activation="eager"
+               interface="org.openecomp.appc.lifecyclemanager.LifecycleManager"/>
+    <reference id="ExecutionQueueServiceRef" availability="mandatory" activation="eager"
+               interface="org.openecomp.appc.executionqueue.ExecutionQueueService"/>
 
-    <service id="commandExecutorService" interface="org.openecomp.appc.executor.CommandExecutor" ref="commandExecutorBean"/>
+    <service id="commandExecutorService" interface="org.openecomp.appc.executor.CommandExecutor"
+             ref="commandExecutorBean"/>
 </blueprint>
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
index 4f97a97..8670ada 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
@@ -24,49 +24,76 @@
 
 package org.openecomp.appc.executionqueue.helper;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
 import org.openecomp.appc.configuration.Configuration;
 import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.executionqueue.impl.QueueManager;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class Util {
 
-    private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+    private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class);
+    private final int default_queue_size = 10;
+    private final int default_threadpool_size = 10;
+    private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size";
+    private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size";
 
-    public static int DEFAULT_QUEUE_SIZE = 10;
-    public static int DEFAULT_THREADPOOL_SIZE = 10;
+    private Configuration configuration;
 
-    public static int getExecutionQueSize(){
-        String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE));
-        int size = DEFAULT_QUEUE_SIZE;
-        try{
+    /**
+     * Initialization.
+     * <p>Used by blueprint.
+     */
+    public void init() {
+
+        configuration = ConfigurationFactory.getConfiguration();
+    }
+
+    public int getExecutionQueueSize() {
+        String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size));
+
+        int size = default_queue_size;
+        try {
             size = Integer.parseInt(sizeStr);
+        } catch (NumberFormatException e) {
+            logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e);
         }
-        catch (NumberFormatException e){
 
-        }
         return size;
     }
 
-    public static int getThreadPoolSize(){
-        String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE));
-        int size = DEFAULT_THREADPOOL_SIZE;
-        try{
-            size = Integer.parseInt(sizeStr);
-        }
-        catch (NumberFormatException e){
+    public int getThreadPoolSize() {
+        String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size));
 
+        int size = default_threadpool_size;
+        try {
+            size = Integer.parseInt(sizeStr);
+        } catch (NumberFormatException e) {
+            logger.error("Error while parse key:" + threadpool_size_key + " got from configuration "
+                + e.getMessage(), e);
         }
+
         return size;
     }
 
-    public static ThreadFactory getThreadFactory(final boolean isDaemon){
+    public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) {
         return new ThreadFactory() {
-            final ThreadFactory factory = Executors.defaultThreadFactory();
+            private final String THREAD_NAME_PATTERN = "%s-%d";
+            private final ThreadFactory factory = Executors.defaultThreadFactory();
+            private final AtomicInteger counter = new AtomicInteger();
+
             public Thread newThread(Runnable r) {
                 Thread t = factory.newThread(r);
                 t.setDaemon(isDaemon);
+                if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) {
+                    final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter
+                        .incrementAndGet());
+                    t.setName(threadName);
+                }
                 return t;
             }
         };
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
index 3092bd8..c29078c 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
@@ -24,51 +24,54 @@
 
 package org.openecomp.appc.executionqueue.impl;
 
-import java.time.Instant;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
 import org.openecomp.appc.exceptions.APPCException;
 import org.openecomp.appc.executionqueue.ExecutionQueueService;
 import org.openecomp.appc.executionqueue.MessageExpirationListener;
 import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
 
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
 
 public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> {
 
     private static final EELFLogger logger =
-            EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
+        EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
 
-    ExecutionQueueServiceImpl(){
+    private QueueManager queueManager;
 
+    public ExecutionQueueServiceImpl() {
+        //do nothing
+    }
+
+    /**
+     * Injected by blueprint
+     *
+     * @param queueManager queue manager to be set
+     */
+    public void setQueueManager(QueueManager queueManager) {
+        this.queueManager = queueManager;
     }
 
     @Override
     public void putMessage(M message) throws APPCException {
-         this.putMessage(message,-1,null);
+        this.putMessage(message, -1, null);
     }
 
     @Override
-    public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
-        try {
-            Instant expirationTime = calculateExpirationTime(timeout,unit);
-            QueueManager queueManager = QueueManager.getInstance();
-            boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime));
-            if(!enqueueTask){
-                throw new APPCException("failed to put message in queue");
-            }
-        } catch (Exception e) {
-            logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage());
-            throw new APPCException(e);
+    public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException {
+        Instant expirationTime = calculateExpirationTime(timeout, unit);
+        boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime));
+        if (!enqueueTask) {
+            logger.error("Error in putMessage method of ExecutionQueueServiceImpl");
+            throw new APPCException("Failed to put message in queue");
         }
     }
 
     @Override
     public void registerMessageExpirationListener(MessageExpirationListener listener) {
-        QueueManager.getInstance().setListener(listener);
+        queueManager.setListener(listener);
     }
 
     private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) {
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
index 11d0b8d..b78f399 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
@@ -24,82 +24,87 @@
 
 package org.openecomp.appc.executionqueue.impl;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
 import org.openecomp.appc.executionqueue.MessageExpirationListener;
 import org.openecomp.appc.executionqueue.helper.Util;
 import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
 
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class QueueManager {
 
-    private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue;
+    private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
 
     private MessageExpirationListener listener;
-
-    private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize();
-
-    private static int MAX_THREAD_SIZE = Util.getThreadPoolSize();
-
     private ExecutorService messageExecutor;
+    private int max_thread_size;
+    private int max_queue_size;
+    private Util executionQueueUtil;
 
-    private static final EELFLogger logger =
-            EELFManager.getInstance().getLogger(QueueManager.class);
-
-    private QueueManager(){
-        init();
+    public QueueManager() {
+        //do nothing
     }
 
-    private static class QueueManagerHolder {
-        private static final QueueManager INSTANCE = new QueueManager();
+    /**
+     * Initialization method used by blueprint
+     */
+    public void init() {
+        max_thread_size = executionQueueUtil.getThreadPoolSize();
+        max_queue_size = executionQueueUtil.getExecutionQueueSize();
+        messageExecutor = new ThreadPoolExecutor(
+            max_thread_size,
+            max_thread_size,
+            0L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue(max_queue_size),
+            executionQueueUtil.getThreadFactory(true, "appc-dispatcher"),
+            new ThreadPoolExecutor.AbortPolicy());
     }
 
-    public static QueueManager getInstance() {
-        return QueueManagerHolder.INSTANCE;
-    }
-
-    private void init(){
-        queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE);
-        messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true));
-
-        for(int i=0;i<MAX_THREAD_SIZE;i++){
-            messageExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    while (true){
-                        try{
-                            QueueMessage<? extends Runnable> queueMessage = queue.take();
-                            if (queueMessage.isExpired()) {
-                                logger.debug("Message expired "+ queueMessage.getMessage());
-                                if(listener != null){
-                                    listener.onMessageExpiration(queueMessage.getMessage());
-                                }
-                                else{
-                                    logger.warn("Listener not available for expired message ");
-                                }
-                            }
-                            else{
-                                queueMessage.getMessage().run();
-                            }
-                        } catch (Exception e) {
-                            logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage());
-                        }
-                    }
-                }
-            });
-        }
+    /**
+     * Destory method used by blueprint
+     */
+    public void stop() {
+        messageExecutor.shutdownNow();
     }
 
     public void setListener(MessageExpirationListener listener) {
         this.listener = listener;
     }
 
-    public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) {
-        return queue.offer(queueMessage);
+    /**
+     * Injected by blueprint
+     *
+     * @param executionQueueUtil Util to be set
+     */
+    public void setExecutionQueueUtil(Util executionQueueUtil) {
+        this.executionQueueUtil = executionQueueUtil;
     }
 
+    public boolean enqueueTask(QueueMessage queueMessage) {
+        boolean isEnqueued = true;
+        try {
+            messageExecutor.execute(() -> {
+                if (queueMessage.isExpired()) {
+                    logger.debug("Message expired " + queueMessage.getMessage());
+                    if (listener != null) {
+                        listener.onMessageExpiration(queueMessage.getMessage());
+                    } else {
+                        logger.warn("Listener not available for expired message ");
+                    }
+                } else {
+                    queueMessage.getMessage().run();
+                }
+            });
+        } catch (RejectedExecutionException ree) {
+            isEnqueued = false;
+        }
+
+        return isEnqueued;
+    }
 }
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java
new file mode 100644
index 0000000..067b6c3
--- /dev/null
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * 
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.executionqueue;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.openecomp.appc.exceptions.APPCException;
+import org.openecomp.appc.executionqueue.helper.Util;
+import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceImpl;
+import org.openecomp.appc.executionqueue.impl.QueueManager;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+
+@RunWith(PowerMockRunner.class)
+public class ExecutionQueueServiceTest {
+
+    @InjectMocks
+    private ExecutionQueueServiceImpl service;
+    @Spy
+    private QueueManager queueManager = new QueueManager();
+    @Spy
+    private Util executionQueueUtil = new Util();
+
+    @Before
+    public void setup() {
+        Mockito.doReturn(true).when(queueManager).enqueueTask(any());
+    }
+
+    @Test
+    public void testPositiveFlow() {
+        Message message = new Message();
+        try {
+            service.putMessage(message);
+            Mockito.verify(queueManager, times(1)).enqueueTask(any());
+        } catch (APPCException e) {
+            Assert.fail(e.toString());
+        }
+    }
+}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java
deleted file mode 100644
index 6e95848..0000000
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.executionqueue;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.openecomp.appc.exceptions.APPCException;
-import org.openecomp.appc.executionqueue.ExecutionQueueService;
-import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory;
-import org.powermock.api.mockito.PowerMockito;
-
-import java.util.concurrent.TimeUnit;
-
-
-public class TestExecutionQueueService {
-
-    @Test
-    public void testPositiveFlow(){
-        Message message = new Message();
-        ExecutionQueueService service =  ExecutionQueueServiceFactory.getExecutionQueueService();
-        try {
-            service.putMessage(message);
-            waitFor(5000);
-            Assert.assertTrue(message.isRunExecuted());
-        } catch (APPCException e) {
-            Assert.fail(e.toString());
-        }
-    }
-
-//    @Test
-    public void testTimeout(){
-        ExecutionQueueService service =  ExecutionQueueServiceFactory.getExecutionQueueService();
-        Message message = new Message();
-        Listener listener = new Listener();
-        service.registerMessageExpirationListener(listener);
-        try {
-            service.putMessage(message,1, TimeUnit.MILLISECONDS);
-            waitFor(5000);
-            Assert.assertTrue(listener.isListenerExecuted());
-        } catch (APPCException e) {
-            e.printStackTrace();
-        }
-    }
-
-    private void waitFor(long milliSeconds){
-        try {
-            Thread.sleep(milliSeconds);
-        } catch (InterruptedException e) {
-            Assert.fail(e.toString());
-        }
-    }
-}