Fix ExecutionQueue cannot be cleaned
Fix that LCM Requests in Execution Queue continue after Stop OAM
request sent.
Fix that Appc-Ansible bundle cannot be fully stopped because
of NPE.
Issue-Id: APPC-159
Change-Id: I8f0a3a79a5c572ad84e66f71b4ddb47118704302
Signed-off-by: Hao Kuang <Hao.Kuang@amdocs.com>
diff --git a/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java b/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java
index fa81ef7..865841d 100644
--- a/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java
+++ b/appc-adapters/appc-ansible-adapter/appc-ansible-adapter-bundle/src/main/java/org/openecomp/appc/adapter/ansible/AnsibleActivator.java
@@ -38,9 +38,6 @@
/**
* This activator is used to initialize and terminate an instance of AnsibleAdapter class
- *
- * Author : Ashwin Sridharan
- * Date : Oct 2016
*/
public class AnsibleActivator implements BundleActivator {
@@ -57,12 +54,12 @@
/**
* The logger to be used
*/
- private static final EELFLogger logger = EELFManager.getInstance().getLogger(AnsibleActivator.class);
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(AnsibleActivator.class);
/**
* The configuration object used to configure this bundle
*/
- private Configuration configuration;
+ private final Configuration configuration = ConfigurationFactory.getConfiguration();
/**
* Called when this bundle is started so the Framework can perform the bundle-specific activities necessary to start
@@ -70,26 +67,24 @@
* <p>
* This method must complete and return to its caller in a timely manner.
* </p>
- *
- * @param context
- * The execution context of the bundle being started.
- * @throws java.lang.Exception
- * If this method throws an exception, this bundle is marked as stopped and the Framework will remove
- * this bundle's listeners, unregister all services registered by this bundle, and release all services
- * used by this bundle.
+ *
+ * @param context The execution context of the bundle being started.
+ * @throws java.lang.Exception If this method throws an exception, this bundle is marked as stopped and the
+ * Framework will remove this bundle's listeners, unregister all services registered
+ * by this bundle, and release all services used by this bundle.
* @see org.osgi.framework.BundleActivator#start(org.osgi.framework.BundleContext)
*/
@Override
public void start(BundleContext context) throws Exception {
logger.info("Starting bundle " + getName());
- String appName = "APPC: ";
+ String appName = "APPC: ";
logger.info(Msg.COMPONENT_INITIALIZING, appName, "Ansible Adapter");
- adapter = new AnsibleAdapterImpl();
-
+ adapter = new AnsibleAdapterImpl();
+
if (registration == null) {
logger.info(Msg.REGISTERING_SERVICE, appName, adapter.getAdapterName(),
- AnsibleAdapter.class.getSimpleName());
+ AnsibleAdapter.class.getSimpleName());
registration = context.registerService(AnsibleAdapter.class, adapter, null);
}
@@ -104,13 +99,11 @@
* <p>
* This method must complete and return to its caller in a timely manner.
* </p>
- *
- * @param context
- * The execution context of the bundle being stopped.
- * @throws java.lang.Exception
- * If this method throws an exception, the bundle is still marked as stopped, and the Framework will
- * remove the bundle's listeners, unregister all services registered by the bundle, and release all
- * services used by the bundle. *
+ *
+ * @param context The execution context of the bundle being stopped.
+ * @throws java.lang.Exception If this method throws an exception, the bundle is still marked as stopped, and the
+ * Framework will remove the bundle's listeners, unregister all services registered
+ * by the bundle, and release all services used by the bundle.
* @see org.osgi.framework.BundleActivator#stop(org.osgi.framework.BundleContext)
*/
@Override
@@ -130,5 +123,4 @@
public String getName() {
return "APPC Ansible Adapter";
}
-
}
diff --git a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java
index 5054d34..f7ffdad 100644
--- a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java
+++ b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/java/org/openecomp/appc/executor/impl/CommandExecutorImpl.java
@@ -28,45 +28,50 @@
package org.openecomp.appc.executor.impl;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.apache.commons.lang.ObjectUtils;
+import org.openecomp.appc.domainmodel.lcm.RuntimeContext;
+import org.openecomp.appc.exceptions.APPCException;
+import org.openecomp.appc.executionqueue.ExecutionQueueService;
+import org.openecomp.appc.executor.CommandExecutor;
+
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.ObjectUtils;
-import org.openecomp.appc.domainmodel.lcm.RuntimeContext;
-import org.openecomp.appc.domainmodel.lcm.ActionLevel;
-import org.openecomp.appc.exceptions.APPCException;
-import org.openecomp.appc.executionqueue.ExecutionQueueService;
-import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory;
-import org.openecomp.appc.executor.CommandExecutor;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
public class CommandExecutorImpl implements CommandExecutor {
- private CommandTaskFactory executionTaskFactory ;
+ private CommandTaskFactory executionTaskFactory;
private static final EELFLogger logger = EELFManager.getInstance().getLogger(CommandExecutorImpl.class);
private ExecutionQueueService executionQueueService;
private ExpiredMessageHandler expiredMessageHandler;
- public CommandExecutorImpl(){
+ public CommandExecutorImpl() {
}
+ /**
+ * Injected by blueprint
+ *
+ * @param executionQueueService
+ */
public void setExecutionQueueService(ExecutionQueueService executionQueueService) {
this.executionQueueService = executionQueueService;
}
+ /**
+ * Injected by blueprint
+ * @param expiredMessageHandler
+ */
public void setExpiredMessageHandler(ExpiredMessageHandler expiredMessageHandler) {
this.expiredMessageHandler = expiredMessageHandler;
}
public void initialize() {
logger.info("initialization started of CommandExecutorImpl");
- executionQueueService = ExecutionQueueServiceFactory.getExecutionQueueService();
executionQueueService.registerMessageExpirationListener(expiredMessageHandler);
}
@@ -77,13 +82,14 @@
/**
* Execute given command
* Create command request and enqueue it for execution.
+ *
* @param commandExecutorInput Contains CommandHeader, command , target Id , payload and conf ID (optional)
* @throws APPCException in case of error.
*/
@Override
- public void executeCommand (RuntimeContext commandExecutorInput) throws APPCException{
+ public void executeCommand(RuntimeContext commandExecutorInput) throws APPCException {
if (logger.isTraceEnabled()) {
- logger.trace("Entering to executeCommand with CommandExecutorInput = "+ ObjectUtils.toString(commandExecutorInput));
+ logger.trace("Entering to executeCommand with CommandExecutorInput = " + ObjectUtils.toString(commandExecutorInput));
}
enqueRequest(commandExecutorInput);
if (logger.isTraceEnabled()) {
@@ -91,30 +97,31 @@
}
}
- private RuntimeContext getCommandRequest(RuntimeContext commandExecutorInput){
+ private RuntimeContext getCommandRequest(RuntimeContext commandExecutorInput) {
if (logger.isTraceEnabled()) {
- logger.trace("Entering to getCommandRequest with CommandExecutorInput = "+ ObjectUtils.toString(commandExecutorInput));
+ logger.trace("Entering to getCommandRequest with CommandExecutorInput = " + ObjectUtils.toString(commandExecutorInput));
}
RuntimeContext commandRequest;
commandRequest = commandExecutorInput;
if (logger.isTraceEnabled()) {
- logger.trace("Exiting from getCommandRequest with (CommandRequest = "+ ObjectUtils.toString(commandRequest)+")");
+ logger.trace("Exiting from getCommandRequest with (CommandRequest = " + ObjectUtils.toString(commandRequest) + ")");
}
return commandRequest;
}
@SuppressWarnings("unchecked")
- private void enqueRequest(RuntimeContext request) throws APPCException{
+ private void enqueRequest(RuntimeContext request) throws APPCException {
if (logger.isTraceEnabled()) {
- logger.trace("Entering to enqueRequest with CommandRequest = "+ ObjectUtils.toString(request));
+ logger.trace("Entering to enqueRequest with CommandRequest = " + ObjectUtils.toString(request));
}
try {
- String action = request.getRequestContext().getAction().name();
CommandTask commandTask = executionTaskFactory.getExecutionTask(request);
+
long remainingTTL = getRemainingTTL(request);
- executionQueueService.putMessage(commandTask,remainingTTL, TimeUnit.MILLISECONDS);
+
+ executionQueueService.putMessage(commandTask, remainingTTL, TimeUnit.MILLISECONDS);
} catch (Exception e) {
- logger.error("Exception: "+e.getMessage());
+ logger.error("Exception: " + e.getMessage());
throw new APPCException(e);
}
@@ -129,9 +136,9 @@
return ChronoUnit.MILLIS.between(Instant.now(), requestTimestamp.plusSeconds(ttl));
}
- private CommandTask getMessageExecutor(RuntimeContext request){
+ private CommandTask getMessageExecutor(RuntimeContext request) {
if (logger.isTraceEnabled()) {
- logger.trace("Entering to getMessageExecutor with command = "+ request);
+ logger.trace("Entering to getMessageExecutor with command = " + request);
}
CommandTask executionTask = executionTaskFactory.getExecutionTask(request);
if (logger.isTraceEnabled()) {
@@ -139,6 +146,4 @@
}
return executionTask;
}
-
-
}
diff --git a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index f84e972..5474dcc 100644
--- a/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/appc-dispatcher/appc-command-executor/appc-command-executor-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -31,27 +31,36 @@
xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
<bean id="commandExecutorBean" class="org.openecomp.appc.executor.impl.CommandExecutorImpl"
scope="singleton" init-method="initialize">
+ <property name="executionQueueService" ref="ExecutionQueueServiceRef"/>
<property name="executionTaskFactory" ref="CommandExecutionTaskFactory"/>
<property name="expiredMessageHandler" ref="expiredMessageHandlerBean"/>
</bean>
- <bean id="CommandExecutionTaskFactory" class="org.openecomp.appc.executor.impl.CommandTaskFactory" scope="singleton" >
- <property name="vnfRequestHandler" ref="vnfRequestHandlerService" />
+ <bean id="CommandExecutionTaskFactory" class="org.openecomp.appc.executor.impl.CommandTaskFactory"
+ scope="singleton">
+ <property name="vnfRequestHandler" ref="vnfRequestHandlerService"/>
<property name="vmRequestHandler" ref="vmRequestHandlerService"/>
- <property name="workflowManager" ref="WorkFlowManagerRef" />
- <property name="lifecyclemanager" ref="LifecyclemanagerRef" />
+ <property name="workflowManager" ref="WorkFlowManagerRef"/>
+ <property name="lifecyclemanager" ref="LifecyclemanagerRef"/>
</bean>
<bean id="expiredMessageHandlerBean" class="org.openecomp.appc.executor.impl.ExpiredMessageHandler"
scope="singleton">
- <property name="vnfRequestHandler" ref="vnfRequestHandlerService" />
+ <property name="vnfRequestHandler" ref="vnfRequestHandlerService"/>
<property name="vmRequestHandler" ref="vmRequestHandlerService"/>
</bean>
- <reference id="WorkFlowManagerRef" availability="mandatory" activation="eager" interface="org.openecomp.appc.workflow.WorkFlowManager" />
- <reference id="vnfRequestHandlerService" availability="optional" activation="eager" interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VNF)" />
- <reference id="vmRequestHandlerService" availability="optional" activation="eager" interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VM)" />
- <reference id="LifecyclemanagerRef" availability="mandatory" activation="eager" interface="org.openecomp.appc.lifecyclemanager.LifecycleManager" />
+ <reference id="WorkFlowManagerRef" availability="mandatory" activation="eager"
+ interface="org.openecomp.appc.workflow.WorkFlowManager"/>
+ <reference id="vnfRequestHandlerService" availability="optional" activation="eager"
+ interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VNF)"/>
+ <reference id="vmRequestHandlerService" availability="optional" activation="eager"
+ interface="org.openecomp.appc.requesthandler.RequestHandler" filter="(level=VM)"/>
+ <reference id="LifecyclemanagerRef" availability="mandatory" activation="eager"
+ interface="org.openecomp.appc.lifecyclemanager.LifecycleManager"/>
+ <reference id="ExecutionQueueServiceRef" availability="mandatory" activation="eager"
+ interface="org.openecomp.appc.executionqueue.ExecutionQueueService"/>
- <service id="commandExecutorService" interface="org.openecomp.appc.executor.CommandExecutor" ref="commandExecutorBean"/>
+ <service id="commandExecutorService" interface="org.openecomp.appc.executor.CommandExecutor"
+ ref="commandExecutorBean"/>
</blueprint>
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
index 4f97a97..8670ada 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/helper/Util.java
@@ -24,49 +24,76 @@
package org.openecomp.appc.executionqueue.helper;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import org.openecomp.appc.configuration.Configuration;
import org.openecomp.appc.configuration.ConfigurationFactory;
+import org.openecomp.appc.executionqueue.impl.QueueManager;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
public class Util {
- private static final Configuration configuration = ConfigurationFactory.getConfiguration();
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(Util.class);
+ private final int default_queue_size = 10;
+ private final int default_threadpool_size = 10;
+ private final String queue_size_key = "appc.dispatcher.executionqueue.backlog.size";
+ private final String threadpool_size_key = "appc.dispatcher.executionqueue.threadpool.size";
- public static int DEFAULT_QUEUE_SIZE = 10;
- public static int DEFAULT_THREADPOOL_SIZE = 10;
+ private Configuration configuration;
- public static int getExecutionQueSize(){
- String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.backlog.size", String.valueOf(DEFAULT_QUEUE_SIZE));
- int size = DEFAULT_QUEUE_SIZE;
- try{
+ /**
+ * Initialization.
+ * <p>Used by blueprint.
+ */
+ public void init() {
+
+ configuration = ConfigurationFactory.getConfiguration();
+ }
+
+ public int getExecutionQueueSize() {
+ String sizeStr = configuration.getProperty(queue_size_key, String.valueOf(default_queue_size));
+
+ int size = default_queue_size;
+ try {
size = Integer.parseInt(sizeStr);
+ } catch (NumberFormatException e) {
+ logger.error("Error while parse key:" + queue_size_key + " got from configuration " + e.getMessage(), e);
}
- catch (NumberFormatException e){
- }
return size;
}
- public static int getThreadPoolSize(){
- String sizeStr = configuration.getProperty("appc.dispatcher.executionqueue.threadpool.size", String.valueOf(DEFAULT_THREADPOOL_SIZE));
- int size = DEFAULT_THREADPOOL_SIZE;
- try{
- size = Integer.parseInt(sizeStr);
- }
- catch (NumberFormatException e){
+ public int getThreadPoolSize() {
+ String sizeStr = configuration.getProperty(threadpool_size_key, String.valueOf(default_threadpool_size));
+ int size = default_threadpool_size;
+ try {
+ size = Integer.parseInt(sizeStr);
+ } catch (NumberFormatException e) {
+ logger.error("Error while parse key:" + threadpool_size_key + " got from configuration "
+ + e.getMessage(), e);
}
+
return size;
}
- public static ThreadFactory getThreadFactory(final boolean isDaemon){
+ public ThreadFactory getThreadFactory(final boolean isDaemon, final String threadNamePrefix) {
return new ThreadFactory() {
- final ThreadFactory factory = Executors.defaultThreadFactory();
+ private final String THREAD_NAME_PATTERN = "%s-%d";
+ private final ThreadFactory factory = Executors.defaultThreadFactory();
+ private final AtomicInteger counter = new AtomicInteger();
+
public Thread newThread(Runnable r) {
Thread t = factory.newThread(r);
t.setDaemon(isDaemon);
+ if (threadNamePrefix != null && !threadNamePrefix.isEmpty()) {
+ final String threadName = String.format(THREAD_NAME_PATTERN, threadNamePrefix, counter
+ .incrementAndGet());
+ t.setName(threadName);
+ }
return t;
}
};
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
index 3092bd8..c29078c 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/ExecutionQueueServiceImpl.java
@@ -24,51 +24,54 @@
package org.openecomp.appc.executionqueue.impl;
-import java.time.Instant;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import org.openecomp.appc.exceptions.APPCException;
import org.openecomp.appc.executionqueue.ExecutionQueueService;
import org.openecomp.appc.executionqueue.MessageExpirationListener;
import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> {
private static final EELFLogger logger =
- EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
+ EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
- ExecutionQueueServiceImpl(){
+ private QueueManager queueManager;
+ public ExecutionQueueServiceImpl() {
+ //do nothing
+ }
+
+ /**
+ * Injected by blueprint
+ *
+ * @param queueManager queue manager to be set
+ */
+ public void setQueueManager(QueueManager queueManager) {
+ this.queueManager = queueManager;
}
@Override
public void putMessage(M message) throws APPCException {
- this.putMessage(message,-1,null);
+ this.putMessage(message, -1, null);
}
@Override
- public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
- try {
- Instant expirationTime = calculateExpirationTime(timeout,unit);
- QueueManager queueManager = QueueManager.getInstance();
- boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime));
- if(!enqueueTask){
- throw new APPCException("failed to put message in queue");
- }
- } catch (Exception e) {
- logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage());
- throw new APPCException(e);
+ public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException {
+ Instant expirationTime = calculateExpirationTime(timeout, unit);
+ boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime));
+ if (!enqueueTask) {
+ logger.error("Error in putMessage method of ExecutionQueueServiceImpl");
+ throw new APPCException("Failed to put message in queue");
}
}
@Override
public void registerMessageExpirationListener(MessageExpirationListener listener) {
- QueueManager.getInstance().setListener(listener);
+ queueManager.setListener(listener);
}
private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) {
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
index 11d0b8d..b78f399 100644
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/main/java/org/openecomp/appc/executionqueue/impl/QueueManager.java
@@ -24,82 +24,87 @@
package org.openecomp.appc.executionqueue.impl;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import org.openecomp.appc.executionqueue.MessageExpirationListener;
import org.openecomp.appc.executionqueue.helper.Util;
import org.openecomp.appc.executionqueue.impl.object.QueueMessage;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class QueueManager {
- private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue;
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(QueueManager.class);
private MessageExpirationListener listener;
-
- private static int MAX_QUEUE_SIZE = Util.getExecutionQueSize();
-
- private static int MAX_THREAD_SIZE = Util.getThreadPoolSize();
-
private ExecutorService messageExecutor;
+ private int max_thread_size;
+ private int max_queue_size;
+ private Util executionQueueUtil;
- private static final EELFLogger logger =
- EELFManager.getInstance().getLogger(QueueManager.class);
-
- private QueueManager(){
- init();
+ public QueueManager() {
+ //do nothing
}
- private static class QueueManagerHolder {
- private static final QueueManager INSTANCE = new QueueManager();
+ /**
+ * Initialization method used by blueprint
+ */
+ public void init() {
+ max_thread_size = executionQueueUtil.getThreadPoolSize();
+ max_queue_size = executionQueueUtil.getExecutionQueueSize();
+ messageExecutor = new ThreadPoolExecutor(
+ max_thread_size,
+ max_thread_size,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue(max_queue_size),
+ executionQueueUtil.getThreadFactory(true, "appc-dispatcher"),
+ new ThreadPoolExecutor.AbortPolicy());
}
- public static QueueManager getInstance() {
- return QueueManagerHolder.INSTANCE;
- }
-
- private void init(){
- queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE);
- messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true));
-
- for(int i=0;i<MAX_THREAD_SIZE;i++){
- messageExecutor.submit(new Runnable() {
- @Override
- public void run() {
- while (true){
- try{
- QueueMessage<? extends Runnable> queueMessage = queue.take();
- if (queueMessage.isExpired()) {
- logger.debug("Message expired "+ queueMessage.getMessage());
- if(listener != null){
- listener.onMessageExpiration(queueMessage.getMessage());
- }
- else{
- logger.warn("Listener not available for expired message ");
- }
- }
- else{
- queueMessage.getMessage().run();
- }
- } catch (Exception e) {
- logger.error("Error in startMessagePolling method of ExecutionQueueServiceImpl" + e.getMessage());
- }
- }
- }
- });
- }
+ /**
+ * Destory method used by blueprint
+ */
+ public void stop() {
+ messageExecutor.shutdownNow();
}
public void setListener(MessageExpirationListener listener) {
this.listener = listener;
}
- public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) {
- return queue.offer(queueMessage);
+ /**
+ * Injected by blueprint
+ *
+ * @param executionQueueUtil Util to be set
+ */
+ public void setExecutionQueueUtil(Util executionQueueUtil) {
+ this.executionQueueUtil = executionQueueUtil;
}
+ public boolean enqueueTask(QueueMessage queueMessage) {
+ boolean isEnqueued = true;
+ try {
+ messageExecutor.execute(() -> {
+ if (queueMessage.isExpired()) {
+ logger.debug("Message expired " + queueMessage.getMessage());
+ if (listener != null) {
+ listener.onMessageExpiration(queueMessage.getMessage());
+ } else {
+ logger.warn("Listener not available for expired message ");
+ }
+ } else {
+ queueMessage.getMessage().run();
+ }
+ });
+ } catch (RejectedExecutionException ree) {
+ isEnqueued = false;
+ }
+
+ return isEnqueued;
+ }
}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java
new file mode 100644
index 0000000..067b6c3
--- /dev/null
+++ b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/ExecutionQueueServiceTest.java
@@ -0,0 +1,68 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP : APPC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2017 Amdocs
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.appc.executionqueue;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.openecomp.appc.exceptions.APPCException;
+import org.openecomp.appc.executionqueue.helper.Util;
+import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceImpl;
+import org.openecomp.appc.executionqueue.impl.QueueManager;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+
+@RunWith(PowerMockRunner.class)
+public class ExecutionQueueServiceTest {
+
+ @InjectMocks
+ private ExecutionQueueServiceImpl service;
+ @Spy
+ private QueueManager queueManager = new QueueManager();
+ @Spy
+ private Util executionQueueUtil = new Util();
+
+ @Before
+ public void setup() {
+ Mockito.doReturn(true).when(queueManager).enqueueTask(any());
+ }
+
+ @Test
+ public void testPositiveFlow() {
+ Message message = new Message();
+ try {
+ service.putMessage(message);
+ Mockito.verify(queueManager, times(1)).enqueueTask(any());
+ } catch (APPCException e) {
+ Assert.fail(e.toString());
+ }
+ }
+}
diff --git a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java b/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java
deleted file mode 100644
index 6e95848..0000000
--- a/appc-dispatcher/appc-dispatcher-common/execution-queue-management-lib/src/test/java/org/openecomp/appc/executionqueue/TestExecutionQueueService.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP : APPC
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Copyright (C) 2017 Amdocs
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * ============LICENSE_END=========================================================
- */
-
-package org.openecomp.appc.executionqueue;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.openecomp.appc.exceptions.APPCException;
-import org.openecomp.appc.executionqueue.ExecutionQueueService;
-import org.openecomp.appc.executionqueue.impl.ExecutionQueueServiceFactory;
-import org.powermock.api.mockito.PowerMockito;
-
-import java.util.concurrent.TimeUnit;
-
-
-public class TestExecutionQueueService {
-
- @Test
- public void testPositiveFlow(){
- Message message = new Message();
- ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService();
- try {
- service.putMessage(message);
- waitFor(5000);
- Assert.assertTrue(message.isRunExecuted());
- } catch (APPCException e) {
- Assert.fail(e.toString());
- }
- }
-
-// @Test
- public void testTimeout(){
- ExecutionQueueService service = ExecutionQueueServiceFactory.getExecutionQueueService();
- Message message = new Message();
- Listener listener = new Listener();
- service.registerMessageExpirationListener(listener);
- try {
- service.putMessage(message,1, TimeUnit.MILLISECONDS);
- waitFor(5000);
- Assert.assertTrue(listener.isListenerExecuted());
- } catch (APPCException e) {
- e.printStackTrace();
- }
- }
-
- private void waitFor(long milliSeconds){
- try {
- Thread.sleep(milliSeconds);
- } catch (InterruptedException e) {
- Assert.fail(e.toString());
- }
- }
-}