Merge "Add multiple messages support in Intermediary"
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
index 6ca4d3c..48b60dc 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
@@ -37,7 +37,7 @@
      * @param automationCompositionElementId the ID of the automation composition element
      * @throws PfModelException in case of a model exception
      */
-    public void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+    void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
 
     /**
      * Handle an update on a automation composition element.
@@ -47,20 +47,20 @@
      * @param properties properties Map
      * @throws PfModelException from Policy framework
      */
-    public void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+    void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
             throws PfModelException;
 
-    public void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+    void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
 
-    public void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+    void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
 
-    public void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+    void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
 
-    public void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+    void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
             throws PfModelException;
 
-    public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
+    void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
             throws PfModelException;
 
-    public void deprime(UUID compositionId) throws PfModelException;
+    void deprime(UUID compositionId) throws PfModelException;
 }
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
index 7e71365..71c9d9a 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
@@ -24,13 +24,13 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
-import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
 import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
 import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
 import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeploy;
 import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeployAck;
 import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionStateChange;
@@ -39,7 +39,6 @@
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
 import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
-import org.onap.policy.models.base.PfModelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -53,7 +52,7 @@
 
     private final CacheProvider cacheProvider;
     private final ParticipantMessagePublisher publisher;
-    private final AutomationCompositionElementListener listener;
+    private final ThreadHandler listener;
     private final AcInstanceStateResolver acInstanceStateResolver;
 
     /**
@@ -64,7 +63,7 @@
      * @param listener the ThreadHandler Listener
      */
     public AutomationCompositionHandler(CacheProvider cacheProvider, ParticipantMessagePublisher publisher,
-            AutomationCompositionElementListener listener) {
+            ThreadHandler listener) {
         this.cacheProvider = cacheProvider;
         this.publisher = publisher;
         this.listener = listener;
@@ -84,17 +83,20 @@
         var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId());
 
         if (automationComposition == null) {
-            var automationCompositionAck =
-                    new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
-            automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
-            automationCompositionAck.setMessage("Automation composition " + stateChangeMsg.getAutomationCompositionId()
-                    + " does not use this participant " + cacheProvider.getParticipantId());
-            automationCompositionAck.setResult(false);
-            automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
-            automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
-            publisher.sendAutomationCompositionAck(automationCompositionAck);
-            LOGGER.debug("Automation composition {} does not use this participant",
-                    stateChangeMsg.getAutomationCompositionId());
+            if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
+                var automationCompositionAck = new AutomationCompositionDeployAck(
+                        ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
+                automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
+                automationCompositionAck.setMessage("Already deleted or never used");
+                automationCompositionAck.setResult(true);
+                automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
+                automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
+                automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
+                publisher.sendAutomationCompositionAck(automationCompositionAck);
+            } else {
+                LOGGER.debug("Automation composition {} does not use this participant",
+                        stateChangeMsg.getAutomationCompositionId());
+            }
             return;
         }
 
@@ -106,18 +108,18 @@
         }
 
         if (DeployOrder.NONE.equals(stateChangeMsg.getDeployOrderedState())) {
-            handleLockOrderState(automationComposition, stateChangeMsg.getLockOrderedState(),
-                    stateChangeMsg.getStartPhase());
+            handleLockOrderState(stateChangeMsg.getMessageId(), automationComposition,
+                    stateChangeMsg.getLockOrderedState(), stateChangeMsg.getStartPhase());
         } else {
-            handleDeployOrderState(automationComposition, stateChangeMsg.getDeployOrderedState(),
-                    stateChangeMsg.getStartPhase());
+            handleDeployOrderState(stateChangeMsg.getMessageId(), automationComposition,
+                    stateChangeMsg.getDeployOrderedState(), stateChangeMsg.getStartPhase());
         }
     }
 
     private boolean checkConsistantOrderState(AutomationComposition automationComposition, DeployOrder deployOrder,
             LockOrder lockOrder) {
         if (DeployOrder.UPDATE.equals(deployOrder)) {
-            return false;
+            return true;
         }
         return acInstanceStateResolver.resolve(deployOrder, lockOrder, automationComposition.getDeployState(),
                 automationComposition.getLockState(), automationComposition.getStateChangeResult()) != null;
@@ -126,19 +128,20 @@
     /**
      * Method to handle state changes.
      *
+     * @param messageId the messageId
      * @param automationComposition participant response
      * @param orderedState automation composition ordered state
      * @param startPhaseMsg startPhase from message
      */
-    private void handleDeployOrderState(final AutomationComposition automationComposition, DeployOrder orderedState,
-            Integer startPhaseMsg) {
+    private void handleDeployOrderState(UUID messageId, final AutomationComposition automationComposition,
+            DeployOrder orderedState, Integer startPhaseMsg) {
 
         switch (orderedState) {
             case UNDEPLOY:
-                handleUndeployState(automationComposition, startPhaseMsg);
+                handleUndeployState(messageId, automationComposition, startPhaseMsg);
                 break;
             case DELETE:
-                handleDeleteState(automationComposition, startPhaseMsg);
+                handleDeleteState(messageId, automationComposition, startPhaseMsg);
                 break;
 
             default:
@@ -150,19 +153,20 @@
     /**
      * Method to handle state changes.
      *
+     * @param messageId the messageId
      * @param automationComposition participant response
      * @param orderedState automation composition ordered state
      * @param startPhaseMsg startPhase from message
      */
-    private void handleLockOrderState(final AutomationComposition automationComposition, LockOrder orderedState,
-            Integer startPhaseMsg) {
+    private void handleLockOrderState(UUID messageId, final AutomationComposition automationComposition,
+            LockOrder orderedState, Integer startPhaseMsg) {
 
         switch (orderedState) {
             case LOCK:
-                handleLockState(automationComposition, startPhaseMsg);
+                handleLockState(messageId, automationComposition, startPhaseMsg);
                 break;
             case UNLOCK:
-                handleUnlockState(automationComposition, startPhaseMsg);
+                handleUnlockState(messageId, automationComposition, startPhaseMsg);
                 break;
             default:
                 LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey());
@@ -188,7 +192,7 @@
 
                 updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy);
 
-                callParticipantUpdateProperty(participantDeploy.getAcElementList(),
+                callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(),
                         updateMsg.getAutomationCompositionId());
             }
         }
@@ -212,35 +216,28 @@
                     cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
                             deployMsg.getAutomationCompositionId(), participantDeploy);
                 }
-                callParticipanDeploy(participantDeploy.getAcElementList(), deployMsg.getStartPhase(),
-                        deployMsg.getAutomationCompositionId());
+                callParticipanDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
+                        deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
             }
         }
     }
 
-    private void callParticipanDeploy(List<AcElementDeploy> acElements, Integer startPhaseMsg, UUID instanceId) {
-        try {
-            for (var element : acElements) {
-                var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId());
-                int startPhase = ParticipantUtils.findStartPhase(commonProperties);
-                if (startPhaseMsg.equals(startPhase)) {
-                    var map = new HashMap<>(commonProperties);
-                    map.putAll(element.getProperties());
-                    listener.deploy(instanceId, element, map);
-                }
+    private void callParticipanDeploy(UUID messageId, List<AcElementDeploy> acElements, Integer startPhaseMsg,
+            UUID instanceId) {
+        for (var element : acElements) {
+            var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId());
+            int startPhase = ParticipantUtils.findStartPhase(commonProperties);
+            if (startPhaseMsg.equals(startPhase)) {
+                var map = new HashMap<>(commonProperties);
+                map.putAll(element.getProperties());
+                listener.deploy(messageId, instanceId, element, map);
             }
-        } catch (PfModelException e) {
-            LOGGER.debug("Automation composition element Deploy failed {}", instanceId);
         }
     }
 
-    private void callParticipantUpdateProperty(List<AcElementDeploy> acElements, UUID instanceId) {
-        try {
-            for (var element : acElements) {
-                listener.update(instanceId, element, element.getProperties());
-            }
-        } catch (PfModelException e) {
-            LOGGER.debug("Automation composition element update failed {}", instanceId);
+    private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements, UUID instanceId) {
+        for (var element : acElements) {
+            listener.update(messageId, instanceId, element, element.getProperties());
         }
     }
 
@@ -255,101 +252,86 @@
     /**
      * Method to handle when the new state from participant is UNINITIALISED state.
      *
+     * @param messageId the messageId
      * @param automationComposition participant response
      * @param startPhaseMsg startPhase from message
      */
-    private void handleUndeployState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
-        try {
-            for (var acElement : automationComposition.getElements().values()) {
-                int startPhase = ParticipantUtils.findStartPhase(
-                        cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
-                if (startPhaseMsg.equals(startPhase)) {
-                    listener.undeploy(automationComposition.getInstanceId(), acElement.getId());
-                }
+    private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition,
+            Integer startPhaseMsg) {
+        for (var acElement : automationComposition.getElements().values()) {
+            int startPhase = ParticipantUtils.findStartPhase(
+                    cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+            if (startPhaseMsg.equals(startPhase)) {
+                listener.undeploy(messageId, automationComposition.getInstanceId(), acElement.getId());
             }
-        } catch (PfModelException e) {
-            LOGGER.debug("Automation composition element Undeploy failed {}", automationComposition.getInstanceId());
         }
     }
 
-    private void handleDeleteState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
-        try {
-            for (var acElement : automationComposition.getElements().values()) {
-                int startPhase = ParticipantUtils.findStartPhase(
-                        cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
-                if (startPhaseMsg.equals(startPhase)) {
-                    listener.delete(automationComposition.getInstanceId(), acElement.getId());
-                }
+    private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition,
+            Integer startPhaseMsg) {
+        for (var acElement : automationComposition.getElements().values()) {
+            int startPhase = ParticipantUtils.findStartPhase(
+                    cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+            if (startPhaseMsg.equals(startPhase)) {
+                listener.delete(messageId, automationComposition.getInstanceId(), acElement.getId());
             }
-        } catch (PfModelException e) {
-            LOGGER.debug("Automation composition element Delete failed {}", automationComposition.getInstanceId());
         }
     }
 
     /**
      * Method to handle when the new state from participant is PASSIVE state.
      *
+     * @param messageId the messageId
      * @param automationComposition participant response
      * @param startPhaseMsg startPhase from message
      */
-    private void handleLockState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
-        try {
-            for (var acElement : automationComposition.getElements().values()) {
-                int startPhase = ParticipantUtils.findStartPhase(
-                        cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
-                if (startPhaseMsg.equals(startPhase)) {
-                    listener.lock(automationComposition.getInstanceId(), acElement.getId());
-                }
+    private void handleLockState(UUID messageId, final AutomationComposition automationComposition,
+            Integer startPhaseMsg) {
+        for (var acElement : automationComposition.getElements().values()) {
+            int startPhase = ParticipantUtils.findStartPhase(
+                    cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+            if (startPhaseMsg.equals(startPhase)) {
+                listener.lock(messageId, automationComposition.getInstanceId(), acElement.getId());
             }
-        } catch (PfModelException e) {
-            LOGGER.debug("Automation composition element Lock failed {}", automationComposition.getInstanceId());
         }
     }
 
     /**
      * Method to handle when the new state from participant is RUNNING state.
      *
+     * @param messageId the messageId
      * @param automationComposition participant response
      * @param startPhaseMsg startPhase from message
      */
-    private void handleUnlockState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
-        try {
-            for (var acElement : automationComposition.getElements().values()) {
-                int startPhase = ParticipantUtils.findStartPhase(
-                        cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
-                if (startPhaseMsg.equals(startPhase)) {
-                    listener.unlock(automationComposition.getInstanceId(), acElement.getId());
-                }
+    private void handleUnlockState(UUID messageId, final AutomationComposition automationComposition,
+            Integer startPhaseMsg) {
+        for (var acElement : automationComposition.getElements().values()) {
+            int startPhase = ParticipantUtils.findStartPhase(
+                    cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+            if (startPhaseMsg.equals(startPhase)) {
+                listener.unlock(messageId, automationComposition.getInstanceId(), acElement.getId());
             }
-        } catch (PfModelException e) {
-            LOGGER.debug("Automation composition element Unlock failed {}", automationComposition.getInstanceId());
         }
     }
 
     /**
      * Handles prime a Composition Definition.
      *
+     * @param messageId the messageId
      * @param compositionId the compositionId
      * @param list the list of AutomationCompositionElementDefinition
      */
-    public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
-        try {
-            listener.prime(compositionId, list);
-        } catch (PfModelException e) {
-            LOGGER.debug("Composition prime failed {}", compositionId);
-        }
+    public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+        listener.prime(messageId, compositionId, list);
     }
 
     /**
      * Handles deprime a Composition Definition.
      *
+     * @param messageId the messageId
      * @param compositionId the compositionId
      */
-    public void deprime(UUID compositionId) {
-        try {
-            listener.deprime(compositionId);
-        } catch (PfModelException e) {
-            LOGGER.debug("Composition deprime failed {}", compositionId);
-        }
+    public void deprime(UUID messageId, UUID compositionId) {
+        listener.deprime(messageId, compositionId);
     }
 }
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
index 2c34652..cfd61c4 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
@@ -99,6 +99,7 @@
                 new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
         automationCompositionStateChangeAck.setParticipantId(cacheProvider.getParticipantId());
         automationCompositionStateChangeAck.setMessage(message);
+        automationCompositionStateChangeAck.setResponseTo(cacheProvider.getMsgIdentification().get(element.getId()));
         automationCompositionStateChangeAck.setStateChangeResult(stateChangeResult);
         automationCompositionStateChangeAck.setAutomationCompositionId(automationCompositionId);
         automationCompositionStateChangeAck.getAutomationCompositionResultMap().put(element.getId(),
@@ -107,6 +108,7 @@
         LOGGER.debug("Automation composition element {} state changed to {}", elementId, deployState);
         automationCompositionStateChangeAck.setResult(true);
         publisher.sendAutomationCompositionAck(automationCompositionStateChangeAck);
+        cacheProvider.getMsgIdentification().remove(element.getId());
     }
 
     private void handleDeployState(AutomationComposition automationComposition, AutomationCompositionElement element,
@@ -213,10 +215,12 @@
         participantPrimeAck.setCompositionId(compositionId);
         participantPrimeAck.setMessage(message);
         participantPrimeAck.setResult(true);
+        participantPrimeAck.setResponseTo(cacheProvider.getMsgIdentification().get(compositionId));
         participantPrimeAck.setCompositionState(state);
         participantPrimeAck.setStateChangeResult(stateChangeResult);
         participantPrimeAck.setParticipantId(cacheProvider.getParticipantId());
         participantPrimeAck.setState(ParticipantState.ON_LINE);
         publisher.sendParticipantPrimeAck(participantPrimeAck);
+        cacheProvider.getMsgIdentification().remove(compositionId);
     }
 }
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
index 09e75e8..119cc11 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
@@ -55,6 +55,9 @@
     private final Map<UUID, Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition>> acElementsDefinitions =
             new ConcurrentHashMap<>();
 
+    @Getter
+    private final Map<UUID, UUID> msgIdentification = new ConcurrentHashMap<>();
+
     /**
      * Constructor.
      *
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index 0e7e193..3a3a0cc 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -186,11 +186,13 @@
                 }
             }
             cacheProvider.addElementDefinition(participantPrimeMsg.getCompositionId(), list);
-            automationCompositionHandler.prime(participantPrimeMsg.getCompositionId(), list);
+            automationCompositionHandler.prime(participantPrimeMsg.getMessageId(),
+                    participantPrimeMsg.getCompositionId(), list);
         } else {
             // deprime
             cacheProvider.removeElementDefinition(participantPrimeMsg.getCompositionId());
-            automationCompositionHandler.deprime(participantPrimeMsg.getCompositionId());
+            automationCompositionHandler.deprime(participantPrimeMsg.getMessageId(),
+                    participantPrimeMsg.getCompositionId());
         }
     }
 
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
new file mode 100644
index 0000000..b5866d7
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
@@ -0,0 +1,275 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import lombok.RequiredArgsConstructor;
+import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
+import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.models.base.PfModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class ThreadHandler implements Closeable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
+
+    private final AutomationCompositionElementListener listener;
+    private final ParticipantIntermediaryApi intermediaryApi;
+    private final CacheProvider cacheProvider;
+
+    private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
+    /**
+     * Handle an update on a automation composition element.
+     *
+     * @param messageId the messageId
+     * @param instanceId the automationComposition Id
+     * @param element the information on the automation composition element
+     * @param properties properties Map
+     */
+    public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+        cleanExecution(element.getId(), messageId);
+        var result = executor.submit(() -> this.deployProcess(instanceId, element, properties));
+        executionMap.put(element.getId(), result);
+    }
+
+    private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+        try {
+            listener.deploy(instanceId, element, properties);
+        } catch (PfModelException e) {
+            LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
+            intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED,
+                    null, StateChangeResult.FAILED, "Automation composition element deploy failed");
+        }
+        executionMap.remove(element.getId());
+    }
+
+    /**
+     * Handle a automation composition element state change.
+     *
+     * @param messageId the messageId
+     * @param instanceId the automationComposition Id
+     * @param elementId the ID of the automation composition element
+     */
+    public void undeploy(UUID messageId, UUID instanceId, UUID elementId) {
+        cleanExecution(elementId, messageId);
+        var result = executor.submit(() -> this.undeployProcess(instanceId, elementId));
+        executionMap.put(elementId, result);
+    }
+
+    private void undeployProcess(UUID instanceId, UUID elementId) {
+        try {
+            listener.undeploy(instanceId, elementId);
+        } catch (PfModelException e) {
+            LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage());
+            intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null,
+                    StateChangeResult.FAILED, "Automation composition element undeploy failed");
+        }
+        executionMap.remove(elementId);
+    }
+
+    /**
+     * Handle a automation composition element lock.
+     *
+     * @param messageId the messageId
+     * @param instanceId the automationComposition Id
+     * @param elementId the ID of the automation composition element
+     */
+    public void lock(UUID messageId, UUID instanceId, UUID elementId) {
+        cleanExecution(elementId, messageId);
+        var result = executor.submit(() -> this.lockProcess(instanceId, elementId));
+        executionMap.put(elementId, result);
+    }
+
+    private void lockProcess(UUID instanceId, UUID elementId) {
+        try {
+            listener.lock(instanceId, elementId);
+        } catch (PfModelException e) {
+            LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage());
+            intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
+                    StateChangeResult.FAILED, "Automation composition element lock failed");
+        }
+        executionMap.remove(elementId);
+    }
+
+    /**
+     * Handle a automation composition element unlock.
+     *
+     * @param messageId the messageId
+     * @param instanceId the automationComposition Id
+     * @param elementId the ID of the automation composition element
+     */
+    public void unlock(UUID messageId, UUID instanceId, UUID elementId) {
+        cleanExecution(elementId, messageId);
+        var result = executor.submit(() -> this.unlockProcess(instanceId, elementId));
+        executionMap.put(elementId, result);
+    }
+
+    private void unlockProcess(UUID instanceId, UUID elementId) {
+        try {
+            listener.unlock(instanceId, elementId);
+        } catch (PfModelException e) {
+            LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage());
+            intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
+                    StateChangeResult.FAILED, "Automation composition element unlock failed");
+        }
+        executionMap.remove(elementId);
+    }
+
+    /**
+     * Handle a automation composition element delete.
+     *
+     * @param messageId the messageId
+     * @param instanceId the automationComposition Id
+     * @param elementId the ID of the automation composition element
+     */
+    public void delete(UUID messageId, UUID instanceId, UUID elementId) {
+        cleanExecution(elementId, messageId);
+        var result = executor.submit(() -> this.deleteProcess(instanceId, elementId));
+        executionMap.put(elementId, result);
+    }
+
+    private void deleteProcess(UUID instanceId, UUID elementId) {
+        try {
+            listener.delete(instanceId, elementId);
+        } catch (PfModelException e) {
+            LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage());
+            intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null,
+                    StateChangeResult.FAILED, "Automation composition element delete failed");
+        }
+        executionMap.remove(elementId);
+    }
+
+    /**
+     * Handle a automation composition element properties update.
+     *
+     * @param messageId the messageId
+     * @param instanceId the automationComposition Id
+     * @param element the information on the automation composition element
+     * @param properties properties Map
+     */
+    public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+        cleanExecution(element.getId(), messageId);
+        var result = executor.submit(() -> this.updateProcess(instanceId, element, properties));
+        executionMap.put(element.getId(), result);
+    }
+
+    private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+        try {
+            listener.update(instanceId, element, properties);
+        } catch (PfModelException e) {
+            LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage());
+            intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
+                    null, StateChangeResult.FAILED, "Automation composition element update failed");
+        }
+        executionMap.remove(element.getId());
+    }
+
+    private void cleanExecution(UUID execIdentificationId, UUID messageId) {
+        var process = executionMap.get(execIdentificationId);
+        if (process != null) {
+            if (!process.isDone()) {
+                process.cancel(true);
+            }
+            executionMap.remove(execIdentificationId);
+        }
+        cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
+    }
+
+    /**
+     * Handles prime a Composition Definition.
+     *
+     * @param messageId the messageId
+     * @param compositionId the compositionId
+     * @param list the list of AutomationCompositionElementDefinition
+     */
+    public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+        cleanExecution(compositionId, messageId);
+        var result = executor.submit(() -> this.primeProcess(compositionId, list));
+        executionMap.put(compositionId, result);
+    }
+
+    private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+        try {
+            listener.prime(compositionId, list);
+            executionMap.remove(compositionId);
+        } catch (PfModelException e) {
+            LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage());
+            intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED,
+                    "Composition Defintion prime failed");
+        }
+    }
+
+    /**
+     * Handles deprime a Composition Definition.
+     *
+     * @param messageId the messageId
+     * @param compositionId the compositionId
+     */
+    public void deprime(UUID messageId, UUID compositionId) {
+        cleanExecution(compositionId, messageId);
+        var result = executor.submit(() -> this.deprimeProcess(compositionId));
+        executionMap.put(compositionId, result);
+    }
+
+    private void deprimeProcess(UUID compositionId) {
+        try {
+            listener.deprime(compositionId);
+            executionMap.remove(compositionId);
+        } catch (PfModelException e) {
+            LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage());
+            intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
+                    "Composition Defintion deprime failed");
+        }
+    }
+
+
+    /**
+     * Closes this stream and releases any system resources associated
+     * with it. If the stream is already closed then invoking this
+     * method has no effect.
+     *
+     * @throws IOException if an I/O error occurs
+     */
+    @Override
+    public void close() throws IOException {
+        executor.shutdown();
+    }
+}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java
index b4397b4..dd49ee3 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java
@@ -32,7 +32,6 @@
 import java.util.UUID;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
 import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
 import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
@@ -44,7 +43,6 @@
 import org.onap.policy.clamp.models.acm.messages.dmaap.participant.PropertiesUpdate;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
 import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
-import org.onap.policy.models.base.PfModelException;
 import org.springframework.test.context.junit.jupiter.SpringExtension;
 
 @ExtendWith(SpringExtension.class)
@@ -54,13 +52,14 @@
     void handleAutomationCompositionStateChangeNullTest() {
         var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
         var cacheProvider = mock(CacheProvider.class);
-        var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher,
-                mock(AutomationCompositionElementListener.class));
+        var ach =
+                new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, mock(ThreadHandler.class));
 
         var automationCompositionStateChange = new AutomationCompositionStateChange();
         assertDoesNotThrow(() -> ach.handleAutomationCompositionStateChange(automationCompositionStateChange));
 
         automationCompositionStateChange.setAutomationCompositionId(UUID.randomUUID());
+        automationCompositionStateChange.setDeployOrderedState(DeployOrder.DELETE);
         assertDoesNotThrow(() -> ach.handleAutomationCompositionStateChange(automationCompositionStateChange));
         verify(participantMessagePublisher).sendAutomationCompositionAck(any(AutomationCompositionDeployAck.class));
 
@@ -73,7 +72,7 @@
     }
 
     @Test
-    void handleAutomationCompositionStateChangeUndeployTest() throws PfModelException {
+    void handleAutomationCompositionStateChangeUndeployTest() {
         var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
                 automationComposition.getInstanceId(), DeployOrder.UNDEPLOY, LockOrder.NONE);
@@ -83,14 +82,14 @@
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
         var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
-        var listener = mock(AutomationCompositionElementListener.class);
+        var listener = mock(ThreadHandler.class);
         var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
-        verify(listener, times(automationComposition.getElements().size())).undeploy(any(), any());
+        verify(listener, times(automationComposition.getElements().size())).undeploy(any(), any(), any());
     }
 
     @Test
-    void handleAutomationCompositionStateChangeLockTest() throws PfModelException {
+    void handleAutomationCompositionStateChangeLockTest() {
         var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
                 automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.LOCK);
@@ -100,14 +99,14 @@
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
         var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
-        var listener = mock(AutomationCompositionElementListener.class);
+        var listener = mock(ThreadHandler.class);
         var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
-        verify(listener, times(automationComposition.getElements().size())).lock(any(), any());
+        verify(listener, times(automationComposition.getElements().size())).lock(any(), any(), any());
     }
 
     @Test
-    void handleAutomationCompositionStateChangeUnlockTest() throws PfModelException {
+    void handleAutomationCompositionStateChangeUnlockTest() {
         var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
                 automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.UNLOCK);
@@ -117,14 +116,14 @@
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
         var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
-        var listener = mock(AutomationCompositionElementListener.class);
+        var listener = mock(ThreadHandler.class);
         var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
-        verify(listener, times(automationComposition.getElements().size())).unlock(any(), any());
+        verify(listener, times(automationComposition.getElements().size())).unlock(any(), any(), any());
     }
 
     @Test
-    void handleAutomationCompositionStateChangeDeleteTest() throws PfModelException {
+    void handleAutomationCompositionStateChangeDeleteTest() {
         var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
         var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
                 automationComposition.getInstanceId(), DeployOrder.DELETE, LockOrder.NONE);
@@ -134,16 +133,16 @@
         when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
 
         var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
-        var listener = mock(AutomationCompositionElementListener.class);
+        var listener = mock(ThreadHandler.class);
         var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
         ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
-        verify(listener, times(automationComposition.getElements().size())).delete(any(), any());
+        verify(listener, times(automationComposition.getElements().size())).delete(any(), any(), any());
     }
 
     @Test
-    void handleAcPropertyUpdateTest() throws PfModelException {
+    void handleAcPropertyUpdateTest() {
         var cacheProvider = mock(CacheProvider.class);
-        var listener = mock(AutomationCompositionElementListener.class);
+        var listener = mock(ThreadHandler.class);
         var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
         var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
 
@@ -166,13 +165,13 @@
         participantDeploy.getAcElementList().add(acElementDeploy);
 
         ach.handleAcPropertyUpdate(updateMsg);
-        verify(listener).update(any(), any(), any());
+        verify(listener).update(any(), any(), any(), any());
     }
 
     @Test
-    void handleAutomationCompositionDeployTest() throws PfModelException {
+    void handleAutomationCompositionDeployTest() {
         var cacheProvider = mock(CacheProvider.class);
-        var listener = mock(AutomationCompositionElementListener.class);
+        var listener = mock(ThreadHandler.class);
         var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
         var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
 
@@ -196,27 +195,29 @@
             participantDeploy.getAcElementList().add(acElementDeploy);
         }
         ach.handleAutomationCompositionDeploy(deployMsg);
-        verify(listener, times(automationComposition.getElements().size())).deploy(any(), any(), any());
+        verify(listener, times(automationComposition.getElements().size())).deploy(any(), any(), any(), any());
     }
 
     @Test
-    void handleComposiotPrimeTest() throws PfModelException {
-        var listener = mock(AutomationCompositionElementListener.class);
+    void handleComposiotPrimeTest() {
+        var listener = mock(ThreadHandler.class);
         var ach = new AutomationCompositionHandler(mock(CacheProvider.class), mock(ParticipantMessagePublisher.class),
                 listener);
         var compositionId = UUID.randomUUID();
         var list = List.of(new AutomationCompositionElementDefinition());
-        ach.prime(compositionId, list);
-        verify(listener).prime(compositionId, list);
+        var messageId = UUID.randomUUID();
+        ach.prime(messageId, compositionId, list);
+        verify(listener).prime(messageId, compositionId, list);
     }
 
     @Test
-    void handleComposiotDeprimeTest() throws PfModelException {
-        var listener = mock(AutomationCompositionElementListener.class);
+    void handleComposiotDeprimeTest() {
+        var listener = mock(ThreadHandler.class);
         var ach = new AutomationCompositionHandler(mock(CacheProvider.class), mock(ParticipantMessagePublisher.class),
                 listener);
         var compositionId = UUID.randomUUID();
-        ach.deprime(compositionId);
-        verify(listener).deprime(compositionId);
+        var messageId = UUID.randomUUID();
+        ach.deprime(messageId, compositionId);
+        verify(listener).deprime(messageId, compositionId);
     }
 }
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
index 895d4ed..237cab2 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
@@ -158,17 +158,20 @@
     void handleParticipantPrimeTest() {
         var cacheProvider = mock(CacheProvider.class);
         when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+
+        var participantPrime = new ParticipantPrime();
+        participantPrime.setCompositionId(UUID.randomUUID());
+        participantPrime.setMessageId(UUID.randomUUID());
+        participantPrime.setParticipantDefinitionUpdates(List.of(createParticipantDefinition()));
+
         var publisher = mock(ParticipantMessagePublisher.class);
         var acHandler = mock(AutomationCompositionHandler.class);
         var participantHandler = new ParticipantHandler(acHandler, mock(AutomationCompositionOutHandler.class),
                 publisher, cacheProvider);
 
-        var participantPrime = new ParticipantPrime();
-        participantPrime.setCompositionId(UUID.randomUUID());
-        participantPrime.setParticipantDefinitionUpdates(List.of(createParticipantDefinition()));
         participantHandler.handleParticipantPrime(participantPrime);
         verify(cacheProvider).addElementDefinition(any(), any());
-        verify(acHandler).prime(any(), any());
+        verify(acHandler).prime(any(), any(), any());
     }
 
     @Test
@@ -182,9 +185,11 @@
         var participantPrime = new ParticipantPrime();
         var compositionId = UUID.randomUUID();
         participantPrime.setCompositionId(compositionId);
+        var messageId = UUID.randomUUID();
+        participantPrime.setMessageId(messageId);
         participantHandler.handleParticipantPrime(participantPrime);
         verify(cacheProvider).removeElementDefinition(compositionId);
-        verify(acHandler).deprime(compositionId);
+        verify(acHandler).deprime(messageId, compositionId);
     }
 
     @Test
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java
new file mode 100644
index 0000000..767a916
--- /dev/null
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java
@@ -0,0 +1,160 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.ws.rs.core.Response.Status;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
+import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.models.base.PfModelException;
+
+class ThreadHandlerTest {
+
+    private static final int TIMEOUT = 400;
+
+    @Test
+    void test() throws PfModelException {
+        var listener = mock(AutomationCompositionElementListener.class);
+        var intermediaryApi = mock(ParticipantIntermediaryApi.class);
+        var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class));
+
+        var compositionId = UUID.randomUUID();
+        var list = List.of(new AutomationCompositionElementDefinition());
+        var messageId = UUID.randomUUID();
+        threadHandler.prime(messageId, compositionId, list);
+        verify(listener, timeout(TIMEOUT)).prime(compositionId, list);
+
+        clearInvocations(listener);
+        var element = new AcElementDeploy();
+        var elementId = UUID.randomUUID();
+        element.setId(elementId);
+        Map<String, Object> properties = Map.of("key", "value");
+        var instanceId = UUID.randomUUID();
+        threadHandler.deploy(messageId, instanceId, element, properties);
+        verify(listener, timeout(TIMEOUT)).deploy(instanceId, element, properties);
+
+        clearInvocations(listener);
+        threadHandler.update(messageId, instanceId, element, properties);
+        verify(listener, timeout(TIMEOUT)).update(instanceId, element, properties);
+
+        clearInvocations(listener);
+        threadHandler.lock(messageId, instanceId, elementId);
+        verify(listener, timeout(TIMEOUT)).lock(instanceId, elementId);
+
+        clearInvocations(listener);
+        threadHandler.unlock(messageId, instanceId, elementId);
+        verify(listener, timeout(TIMEOUT)).unlock(instanceId, elementId);
+
+        clearInvocations(listener);
+        threadHandler.undeploy(messageId, instanceId, elementId);
+        verify(listener, timeout(TIMEOUT)).undeploy(instanceId, elementId);
+
+        clearInvocations(listener);
+        threadHandler.delete(messageId, instanceId, elementId);
+        verify(listener, timeout(TIMEOUT)).delete(instanceId, elementId);
+
+        clearInvocations(listener);
+        threadHandler.deprime(messageId, compositionId);
+        verify(listener, timeout(TIMEOUT)).deprime(compositionId);
+    }
+
+    @Test
+    void testException() throws PfModelException {
+        var listener = mock(AutomationCompositionElementListener.class);
+        var intermediaryApi = mock(ParticipantIntermediaryApi.class);
+        var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class));
+
+        var compositionId = UUID.randomUUID();
+        var list = List.of(new AutomationCompositionElementDefinition());
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).prime(compositionId, list);
+        var messageId = UUID.randomUUID();
+        threadHandler.prime(messageId, compositionId, list);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId, AcTypeState.COMMISSIONED,
+                StateChangeResult.FAILED, "Composition Defintion prime failed");
+
+        clearInvocations(intermediaryApi);
+        var element = new AcElementDeploy();
+        var elementId = UUID.randomUUID();
+        element.setId(elementId);
+        Map<String, Object> properties = Map.of("key", "value");
+        var instanceId = UUID.randomUUID();
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).deploy(instanceId, element,
+                properties);
+        threadHandler.deploy(messageId, instanceId, element, properties);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+                DeployState.UNDEPLOYED, null, StateChangeResult.FAILED, "Automation composition element deploy failed");
+
+        clearInvocations(listener);
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).update(instanceId, element,
+                properties);
+        threadHandler.update(messageId, instanceId, element, properties);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+                DeployState.DEPLOYED, null, StateChangeResult.FAILED, "Automation composition element update failed");
+
+        clearInvocations(listener);
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).lock(instanceId, elementId);
+        threadHandler.lock(messageId, instanceId, elementId);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, null,
+                LockState.UNLOCKED, StateChangeResult.FAILED, "Automation composition element lock failed");
+
+        clearInvocations(listener);
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).unlock(instanceId,
+                elementId);
+        threadHandler.unlock(messageId, instanceId, elementId);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, null,
+                LockState.LOCKED, StateChangeResult.FAILED, "Automation composition element unlock failed");
+
+        clearInvocations(listener);
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).undeploy(instanceId,
+                elementId);
+        threadHandler.undeploy(messageId, instanceId, elementId);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+                DeployState.DEPLOYED, null, StateChangeResult.FAILED, "Automation composition element undeploy failed");
+
+        clearInvocations(listener);
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).delete(instanceId,
+                elementId);
+        threadHandler.delete(messageId, instanceId, elementId);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+                DeployState.UNDEPLOYED, null, StateChangeResult.FAILED, "Automation composition element delete failed");
+
+        clearInvocations(listener);
+        doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).deprime(compositionId);
+        threadHandler.deprime(messageId, compositionId);
+        verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId,
+                AcTypeState.PRIMED, StateChangeResult.FAILED, "Composition Defintion deprime failed");
+    }
+}