Merge "Add multiple messages support in Intermediary"
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
index 6ca4d3c..48b60dc 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/api/AutomationCompositionElementListener.java
@@ -37,7 +37,7 @@
* @param automationCompositionElementId the ID of the automation composition element
* @throws PfModelException in case of a model exception
*/
- public void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
/**
* Handle an update on a automation composition element.
@@ -47,20 +47,20 @@
* @param properties properties Map
* @throws PfModelException from Policy framework
*/
- public void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
throws PfModelException;
- public void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void lock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
- public void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void unlock(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
- public void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
+ void delete(UUID automationCompositionId, UUID automationCompositionElementId) throws PfModelException;
- public void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
+ void update(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
throws PfModelException;
- public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
+ void prime(UUID compositionId, List<AutomationCompositionElementDefinition> elementDefinitionList)
throws PfModelException;
- public void deprime(UUID compositionId) throws PfModelException;
+ void deprime(UUID compositionId) throws PfModelException;
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
index 7e71365..71c9d9a 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandler.java
@@ -24,13 +24,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
-import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
import org.onap.policy.clamp.models.acm.concepts.ParticipantDeploy;
import org.onap.policy.clamp.models.acm.concepts.ParticipantUtils;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeploy;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionDeployAck;
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.AutomationCompositionStateChange;
@@ -39,7 +39,6 @@
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
import org.onap.policy.clamp.models.acm.persistence.provider.AcInstanceStateResolver;
-import org.onap.policy.models.base.PfModelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -53,7 +52,7 @@
private final CacheProvider cacheProvider;
private final ParticipantMessagePublisher publisher;
- private final AutomationCompositionElementListener listener;
+ private final ThreadHandler listener;
private final AcInstanceStateResolver acInstanceStateResolver;
/**
@@ -64,7 +63,7 @@
* @param listener the ThreadHandler Listener
*/
public AutomationCompositionHandler(CacheProvider cacheProvider, ParticipantMessagePublisher publisher,
- AutomationCompositionElementListener listener) {
+ ThreadHandler listener) {
this.cacheProvider = cacheProvider;
this.publisher = publisher;
this.listener = listener;
@@ -84,17 +83,20 @@
var automationComposition = cacheProvider.getAutomationComposition(stateChangeMsg.getAutomationCompositionId());
if (automationComposition == null) {
- var automationCompositionAck =
- new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
- automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
- automationCompositionAck.setMessage("Automation composition " + stateChangeMsg.getAutomationCompositionId()
- + " does not use this participant " + cacheProvider.getParticipantId());
- automationCompositionAck.setResult(false);
- automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
- automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
- publisher.sendAutomationCompositionAck(automationCompositionAck);
- LOGGER.debug("Automation composition {} does not use this participant",
- stateChangeMsg.getAutomationCompositionId());
+ if (DeployOrder.DELETE.equals(stateChangeMsg.getDeployOrderedState())) {
+ var automationCompositionAck = new AutomationCompositionDeployAck(
+ ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
+ automationCompositionAck.setParticipantId(cacheProvider.getParticipantId());
+ automationCompositionAck.setMessage("Already deleted or never used");
+ automationCompositionAck.setResult(true);
+ automationCompositionAck.setStateChangeResult(StateChangeResult.NO_ERROR);
+ automationCompositionAck.setResponseTo(stateChangeMsg.getMessageId());
+ automationCompositionAck.setAutomationCompositionId(stateChangeMsg.getAutomationCompositionId());
+ publisher.sendAutomationCompositionAck(automationCompositionAck);
+ } else {
+ LOGGER.debug("Automation composition {} does not use this participant",
+ stateChangeMsg.getAutomationCompositionId());
+ }
return;
}
@@ -106,18 +108,18 @@
}
if (DeployOrder.NONE.equals(stateChangeMsg.getDeployOrderedState())) {
- handleLockOrderState(automationComposition, stateChangeMsg.getLockOrderedState(),
- stateChangeMsg.getStartPhase());
+ handleLockOrderState(stateChangeMsg.getMessageId(), automationComposition,
+ stateChangeMsg.getLockOrderedState(), stateChangeMsg.getStartPhase());
} else {
- handleDeployOrderState(automationComposition, stateChangeMsg.getDeployOrderedState(),
- stateChangeMsg.getStartPhase());
+ handleDeployOrderState(stateChangeMsg.getMessageId(), automationComposition,
+ stateChangeMsg.getDeployOrderedState(), stateChangeMsg.getStartPhase());
}
}
private boolean checkConsistantOrderState(AutomationComposition automationComposition, DeployOrder deployOrder,
LockOrder lockOrder) {
if (DeployOrder.UPDATE.equals(deployOrder)) {
- return false;
+ return true;
}
return acInstanceStateResolver.resolve(deployOrder, lockOrder, automationComposition.getDeployState(),
automationComposition.getLockState(), automationComposition.getStateChangeResult()) != null;
@@ -126,19 +128,20 @@
/**
* Method to handle state changes.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param orderedState automation composition ordered state
* @param startPhaseMsg startPhase from message
*/
- private void handleDeployOrderState(final AutomationComposition automationComposition, DeployOrder orderedState,
- Integer startPhaseMsg) {
+ private void handleDeployOrderState(UUID messageId, final AutomationComposition automationComposition,
+ DeployOrder orderedState, Integer startPhaseMsg) {
switch (orderedState) {
case UNDEPLOY:
- handleUndeployState(automationComposition, startPhaseMsg);
+ handleUndeployState(messageId, automationComposition, startPhaseMsg);
break;
case DELETE:
- handleDeleteState(automationComposition, startPhaseMsg);
+ handleDeleteState(messageId, automationComposition, startPhaseMsg);
break;
default:
@@ -150,19 +153,20 @@
/**
* Method to handle state changes.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param orderedState automation composition ordered state
* @param startPhaseMsg startPhase from message
*/
- private void handleLockOrderState(final AutomationComposition automationComposition, LockOrder orderedState,
- Integer startPhaseMsg) {
+ private void handleLockOrderState(UUID messageId, final AutomationComposition automationComposition,
+ LockOrder orderedState, Integer startPhaseMsg) {
switch (orderedState) {
case LOCK:
- handleLockState(automationComposition, startPhaseMsg);
+ handleLockState(messageId, automationComposition, startPhaseMsg);
break;
case UNLOCK:
- handleUnlockState(automationComposition, startPhaseMsg);
+ handleUnlockState(messageId, automationComposition, startPhaseMsg);
break;
default:
LOGGER.error("StateChange message has no state, state is null {}", automationComposition.getKey());
@@ -188,7 +192,7 @@
updateExistingElementsOnThisParticipant(updateMsg.getAutomationCompositionId(), participantDeploy);
- callParticipantUpdateProperty(participantDeploy.getAcElementList(),
+ callParticipantUpdateProperty(updateMsg.getMessageId(), participantDeploy.getAcElementList(),
updateMsg.getAutomationCompositionId());
}
}
@@ -212,35 +216,28 @@
cacheProvider.initializeAutomationComposition(deployMsg.getCompositionId(),
deployMsg.getAutomationCompositionId(), participantDeploy);
}
- callParticipanDeploy(participantDeploy.getAcElementList(), deployMsg.getStartPhase(),
- deployMsg.getAutomationCompositionId());
+ callParticipanDeploy(deployMsg.getMessageId(), participantDeploy.getAcElementList(),
+ deployMsg.getStartPhase(), deployMsg.getAutomationCompositionId());
}
}
}
- private void callParticipanDeploy(List<AcElementDeploy> acElements, Integer startPhaseMsg, UUID instanceId) {
- try {
- for (var element : acElements) {
- var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId());
- int startPhase = ParticipantUtils.findStartPhase(commonProperties);
- if (startPhaseMsg.equals(startPhase)) {
- var map = new HashMap<>(commonProperties);
- map.putAll(element.getProperties());
- listener.deploy(instanceId, element, map);
- }
+ private void callParticipanDeploy(UUID messageId, List<AcElementDeploy> acElements, Integer startPhaseMsg,
+ UUID instanceId) {
+ for (var element : acElements) {
+ var commonProperties = cacheProvider.getCommonProperties(instanceId, element.getId());
+ int startPhase = ParticipantUtils.findStartPhase(commonProperties);
+ if (startPhaseMsg.equals(startPhase)) {
+ var map = new HashMap<>(commonProperties);
+ map.putAll(element.getProperties());
+ listener.deploy(messageId, instanceId, element, map);
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Deploy failed {}", instanceId);
}
}
- private void callParticipantUpdateProperty(List<AcElementDeploy> acElements, UUID instanceId) {
- try {
- for (var element : acElements) {
- listener.update(instanceId, element, element.getProperties());
- }
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element update failed {}", instanceId);
+ private void callParticipantUpdateProperty(UUID messageId, List<AcElementDeploy> acElements, UUID instanceId) {
+ for (var element : acElements) {
+ listener.update(messageId, instanceId, element, element.getProperties());
}
}
@@ -255,101 +252,86 @@
/**
* Method to handle when the new state from participant is UNINITIALISED state.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param startPhaseMsg startPhase from message
*/
- private void handleUndeployState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.undeploy(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleUndeployState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.undeploy(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Undeploy failed {}", automationComposition.getInstanceId());
}
}
- private void handleDeleteState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.delete(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleDeleteState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.delete(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Delete failed {}", automationComposition.getInstanceId());
}
}
/**
* Method to handle when the new state from participant is PASSIVE state.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param startPhaseMsg startPhase from message
*/
- private void handleLockState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.lock(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleLockState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.lock(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Lock failed {}", automationComposition.getInstanceId());
}
}
/**
* Method to handle when the new state from participant is RUNNING state.
*
+ * @param messageId the messageId
* @param automationComposition participant response
* @param startPhaseMsg startPhase from message
*/
- private void handleUnlockState(final AutomationComposition automationComposition, Integer startPhaseMsg) {
- try {
- for (var acElement : automationComposition.getElements().values()) {
- int startPhase = ParticipantUtils.findStartPhase(
- cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
- if (startPhaseMsg.equals(startPhase)) {
- listener.unlock(automationComposition.getInstanceId(), acElement.getId());
- }
+ private void handleUnlockState(UUID messageId, final AutomationComposition automationComposition,
+ Integer startPhaseMsg) {
+ for (var acElement : automationComposition.getElements().values()) {
+ int startPhase = ParticipantUtils.findStartPhase(
+ cacheProvider.getCommonProperties(automationComposition.getInstanceId(), acElement.getId()));
+ if (startPhaseMsg.equals(startPhase)) {
+ listener.unlock(messageId, automationComposition.getInstanceId(), acElement.getId());
}
- } catch (PfModelException e) {
- LOGGER.debug("Automation composition element Unlock failed {}", automationComposition.getInstanceId());
}
}
/**
* Handles prime a Composition Definition.
*
+ * @param messageId the messageId
* @param compositionId the compositionId
* @param list the list of AutomationCompositionElementDefinition
*/
- public void prime(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
- try {
- listener.prime(compositionId, list);
- } catch (PfModelException e) {
- LOGGER.debug("Composition prime failed {}", compositionId);
- }
+ public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+ listener.prime(messageId, compositionId, list);
}
/**
* Handles deprime a Composition Definition.
*
+ * @param messageId the messageId
* @param compositionId the compositionId
*/
- public void deprime(UUID compositionId) {
- try {
- listener.deprime(compositionId);
- } catch (PfModelException e) {
- LOGGER.debug("Composition deprime failed {}", compositionId);
- }
+ public void deprime(UUID messageId, UUID compositionId) {
+ listener.deprime(messageId, compositionId);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
index 2c34652..cfd61c4 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionOutHandler.java
@@ -99,6 +99,7 @@
new AutomationCompositionDeployAck(ParticipantMessageType.AUTOMATION_COMPOSITION_STATECHANGE_ACK);
automationCompositionStateChangeAck.setParticipantId(cacheProvider.getParticipantId());
automationCompositionStateChangeAck.setMessage(message);
+ automationCompositionStateChangeAck.setResponseTo(cacheProvider.getMsgIdentification().get(element.getId()));
automationCompositionStateChangeAck.setStateChangeResult(stateChangeResult);
automationCompositionStateChangeAck.setAutomationCompositionId(automationCompositionId);
automationCompositionStateChangeAck.getAutomationCompositionResultMap().put(element.getId(),
@@ -107,6 +108,7 @@
LOGGER.debug("Automation composition element {} state changed to {}", elementId, deployState);
automationCompositionStateChangeAck.setResult(true);
publisher.sendAutomationCompositionAck(automationCompositionStateChangeAck);
+ cacheProvider.getMsgIdentification().remove(element.getId());
}
private void handleDeployState(AutomationComposition automationComposition, AutomationCompositionElement element,
@@ -213,10 +215,12 @@
participantPrimeAck.setCompositionId(compositionId);
participantPrimeAck.setMessage(message);
participantPrimeAck.setResult(true);
+ participantPrimeAck.setResponseTo(cacheProvider.getMsgIdentification().get(compositionId));
participantPrimeAck.setCompositionState(state);
participantPrimeAck.setStateChangeResult(stateChangeResult);
participantPrimeAck.setParticipantId(cacheProvider.getParticipantId());
participantPrimeAck.setState(ParticipantState.ON_LINE);
publisher.sendParticipantPrimeAck(participantPrimeAck);
+ cacheProvider.getMsgIdentification().remove(compositionId);
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
index 09e75e8..119cc11 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/CacheProvider.java
@@ -55,6 +55,9 @@
private final Map<UUID, Map<ToscaConceptIdentifier, AutomationCompositionElementDefinition>> acElementsDefinitions =
new ConcurrentHashMap<>();
+ @Getter
+ private final Map<UUID, UUID> msgIdentification = new ConcurrentHashMap<>();
+
/**
* Constructor.
*
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index 0e7e193..3a3a0cc 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -186,11 +186,13 @@
}
}
cacheProvider.addElementDefinition(participantPrimeMsg.getCompositionId(), list);
- automationCompositionHandler.prime(participantPrimeMsg.getCompositionId(), list);
+ automationCompositionHandler.prime(participantPrimeMsg.getMessageId(),
+ participantPrimeMsg.getCompositionId(), list);
} else {
// deprime
cacheProvider.removeElementDefinition(participantPrimeMsg.getCompositionId());
- automationCompositionHandler.deprime(participantPrimeMsg.getCompositionId());
+ automationCompositionHandler.deprime(participantPrimeMsg.getMessageId(),
+ participantPrimeMsg.getCompositionId());
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
new file mode 100644
index 0000000..b5866d7
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandler.java
@@ -0,0 +1,275 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import lombok.RequiredArgsConstructor;
+import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
+import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.models.base.PfModelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+@RequiredArgsConstructor
+public class ThreadHandler implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
+
+ private final AutomationCompositionElementListener listener;
+ private final ParticipantIntermediaryApi intermediaryApi;
+ private final CacheProvider cacheProvider;
+
+ private final Map<UUID, Future> executionMap = new ConcurrentHashMap<>();
+
+ private final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
+ /**
+ * Handle an update on a automation composition element.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param element the information on the automation composition element
+ * @param properties properties Map
+ */
+ public void deploy(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ cleanExecution(element.getId(), messageId);
+ var result = executor.submit(() -> this.deployProcess(instanceId, element, properties));
+ executionMap.put(element.getId(), result);
+ }
+
+ private void deployProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ try {
+ listener.deploy(instanceId, element, properties);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element deploy failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.UNDEPLOYED,
+ null, StateChangeResult.FAILED, "Automation composition element deploy failed");
+ }
+ executionMap.remove(element.getId());
+ }
+
+ /**
+ * Handle a automation composition element state change.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void undeploy(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.undeployProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void undeployProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.undeploy(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element undeploy failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.DEPLOYED, null,
+ StateChangeResult.FAILED, "Automation composition element undeploy failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element lock.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void lock(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.lockProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void lockProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.lock(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element lock failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.UNLOCKED,
+ StateChangeResult.FAILED, "Automation composition element lock failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element unlock.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void unlock(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.unlockProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void unlockProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.unlock(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element unlock failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, null, LockState.LOCKED,
+ StateChangeResult.FAILED, "Automation composition element unlock failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element delete.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param elementId the ID of the automation composition element
+ */
+ public void delete(UUID messageId, UUID instanceId, UUID elementId) {
+ cleanExecution(elementId, messageId);
+ var result = executor.submit(() -> this.deleteProcess(instanceId, elementId));
+ executionMap.put(elementId, result);
+ }
+
+ private void deleteProcess(UUID instanceId, UUID elementId) {
+ try {
+ listener.delete(instanceId, elementId);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element delete failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, elementId, DeployState.UNDEPLOYED, null,
+ StateChangeResult.FAILED, "Automation composition element delete failed");
+ }
+ executionMap.remove(elementId);
+ }
+
+ /**
+ * Handle a automation composition element properties update.
+ *
+ * @param messageId the messageId
+ * @param instanceId the automationComposition Id
+ * @param element the information on the automation composition element
+ * @param properties properties Map
+ */
+ public void update(UUID messageId, UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ cleanExecution(element.getId(), messageId);
+ var result = executor.submit(() -> this.updateProcess(instanceId, element, properties));
+ executionMap.put(element.getId(), result);
+ }
+
+ private void updateProcess(UUID instanceId, AcElementDeploy element, Map<String, Object> properties) {
+ try {
+ listener.update(instanceId, element, properties);
+ } catch (PfModelException e) {
+ LOGGER.error("Automation composition element update failed {} {}", instanceId, e.getMessage());
+ intermediaryApi.updateAutomationCompositionElementState(instanceId, element.getId(), DeployState.DEPLOYED,
+ null, StateChangeResult.FAILED, "Automation composition element update failed");
+ }
+ executionMap.remove(element.getId());
+ }
+
+ private void cleanExecution(UUID execIdentificationId, UUID messageId) {
+ var process = executionMap.get(execIdentificationId);
+ if (process != null) {
+ if (!process.isDone()) {
+ process.cancel(true);
+ }
+ executionMap.remove(execIdentificationId);
+ }
+ cacheProvider.getMsgIdentification().put(execIdentificationId, messageId);
+ }
+
+ /**
+ * Handles prime a Composition Definition.
+ *
+ * @param messageId the messageId
+ * @param compositionId the compositionId
+ * @param list the list of AutomationCompositionElementDefinition
+ */
+ public void prime(UUID messageId, UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+ cleanExecution(compositionId, messageId);
+ var result = executor.submit(() -> this.primeProcess(compositionId, list));
+ executionMap.put(compositionId, result);
+ }
+
+ private void primeProcess(UUID compositionId, List<AutomationCompositionElementDefinition> list) {
+ try {
+ listener.prime(compositionId, list);
+ executionMap.remove(compositionId);
+ } catch (PfModelException e) {
+ LOGGER.error("Composition Defintion prime failed {} {}", compositionId, e.getMessage());
+ intermediaryApi.updateCompositionState(compositionId, AcTypeState.COMMISSIONED, StateChangeResult.FAILED,
+ "Composition Defintion prime failed");
+ }
+ }
+
+ /**
+ * Handles deprime a Composition Definition.
+ *
+ * @param messageId the messageId
+ * @param compositionId the compositionId
+ */
+ public void deprime(UUID messageId, UUID compositionId) {
+ cleanExecution(compositionId, messageId);
+ var result = executor.submit(() -> this.deprimeProcess(compositionId));
+ executionMap.put(compositionId, result);
+ }
+
+ private void deprimeProcess(UUID compositionId) {
+ try {
+ listener.deprime(compositionId);
+ executionMap.remove(compositionId);
+ } catch (PfModelException e) {
+ LOGGER.error("Composition Defintion deprime failed {} {}", compositionId, e.getMessage());
+ intermediaryApi.updateCompositionState(compositionId, AcTypeState.PRIMED, StateChangeResult.FAILED,
+ "Composition Defintion deprime failed");
+ }
+ }
+
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ executor.shutdown();
+ }
+}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java
index b4397b4..dd49ee3 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/AutomationCompositionHandlerTest.java
@@ -32,7 +32,6 @@
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
import org.onap.policy.clamp.acm.participant.intermediary.comm.ParticipantMessagePublisher;
import org.onap.policy.clamp.acm.participant.intermediary.main.parameters.CommonTestData;
import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
@@ -44,7 +43,6 @@
import org.onap.policy.clamp.models.acm.messages.dmaap.participant.PropertiesUpdate;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.LockOrder;
-import org.onap.policy.models.base.PfModelException;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ExtendWith(SpringExtension.class)
@@ -54,13 +52,14 @@
void handleAutomationCompositionStateChangeNullTest() {
var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
var cacheProvider = mock(CacheProvider.class);
- var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher,
- mock(AutomationCompositionElementListener.class));
+ var ach =
+ new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, mock(ThreadHandler.class));
var automationCompositionStateChange = new AutomationCompositionStateChange();
assertDoesNotThrow(() -> ach.handleAutomationCompositionStateChange(automationCompositionStateChange));
automationCompositionStateChange.setAutomationCompositionId(UUID.randomUUID());
+ automationCompositionStateChange.setDeployOrderedState(DeployOrder.DELETE);
assertDoesNotThrow(() -> ach.handleAutomationCompositionStateChange(automationCompositionStateChange));
verify(participantMessagePublisher).sendAutomationCompositionAck(any(AutomationCompositionDeployAck.class));
@@ -73,7 +72,7 @@
}
@Test
- void handleAutomationCompositionStateChangeUndeployTest() throws PfModelException {
+ void handleAutomationCompositionStateChangeUndeployTest() {
var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
automationComposition.getInstanceId(), DeployOrder.UNDEPLOY, LockOrder.NONE);
@@ -83,14 +82,14 @@
when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
- var listener = mock(AutomationCompositionElementListener.class);
+ var listener = mock(ThreadHandler.class);
var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
- verify(listener, times(automationComposition.getElements().size())).undeploy(any(), any());
+ verify(listener, times(automationComposition.getElements().size())).undeploy(any(), any(), any());
}
@Test
- void handleAutomationCompositionStateChangeLockTest() throws PfModelException {
+ void handleAutomationCompositionStateChangeLockTest() {
var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.LOCK);
@@ -100,14 +99,14 @@
when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
- var listener = mock(AutomationCompositionElementListener.class);
+ var listener = mock(ThreadHandler.class);
var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
- verify(listener, times(automationComposition.getElements().size())).lock(any(), any());
+ verify(listener, times(automationComposition.getElements().size())).lock(any(), any(), any());
}
@Test
- void handleAutomationCompositionStateChangeUnlockTest() throws PfModelException {
+ void handleAutomationCompositionStateChangeUnlockTest() {
var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
automationComposition.getInstanceId(), DeployOrder.NONE, LockOrder.UNLOCK);
@@ -117,14 +116,14 @@
when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
- var listener = mock(AutomationCompositionElementListener.class);
+ var listener = mock(ThreadHandler.class);
var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
- verify(listener, times(automationComposition.getElements().size())).unlock(any(), any());
+ verify(listener, times(automationComposition.getElements().size())).unlock(any(), any(), any());
}
@Test
- void handleAutomationCompositionStateChangeDeleteTest() throws PfModelException {
+ void handleAutomationCompositionStateChangeDeleteTest() {
var automationComposition = CommonTestData.getTestAutomationCompositionMap().values().iterator().next();
var automationCompositionStateChange = CommonTestData.getStateChange(CommonTestData.getParticipantId(),
automationComposition.getInstanceId(), DeployOrder.DELETE, LockOrder.NONE);
@@ -134,16 +133,16 @@
when(cacheProvider.getCommonProperties(any(UUID.class), any(UUID.class))).thenReturn(Map.of());
var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
- var listener = mock(AutomationCompositionElementListener.class);
+ var listener = mock(ThreadHandler.class);
var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
ach.handleAutomationCompositionStateChange(automationCompositionStateChange);
- verify(listener, times(automationComposition.getElements().size())).delete(any(), any());
+ verify(listener, times(automationComposition.getElements().size())).delete(any(), any(), any());
}
@Test
- void handleAcPropertyUpdateTest() throws PfModelException {
+ void handleAcPropertyUpdateTest() {
var cacheProvider = mock(CacheProvider.class);
- var listener = mock(AutomationCompositionElementListener.class);
+ var listener = mock(ThreadHandler.class);
var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
@@ -166,13 +165,13 @@
participantDeploy.getAcElementList().add(acElementDeploy);
ach.handleAcPropertyUpdate(updateMsg);
- verify(listener).update(any(), any(), any());
+ verify(listener).update(any(), any(), any(), any());
}
@Test
- void handleAutomationCompositionDeployTest() throws PfModelException {
+ void handleAutomationCompositionDeployTest() {
var cacheProvider = mock(CacheProvider.class);
- var listener = mock(AutomationCompositionElementListener.class);
+ var listener = mock(ThreadHandler.class);
var participantMessagePublisher = mock(ParticipantMessagePublisher.class);
var ach = new AutomationCompositionHandler(cacheProvider, participantMessagePublisher, listener);
@@ -196,27 +195,29 @@
participantDeploy.getAcElementList().add(acElementDeploy);
}
ach.handleAutomationCompositionDeploy(deployMsg);
- verify(listener, times(automationComposition.getElements().size())).deploy(any(), any(), any());
+ verify(listener, times(automationComposition.getElements().size())).deploy(any(), any(), any(), any());
}
@Test
- void handleComposiotPrimeTest() throws PfModelException {
- var listener = mock(AutomationCompositionElementListener.class);
+ void handleComposiotPrimeTest() {
+ var listener = mock(ThreadHandler.class);
var ach = new AutomationCompositionHandler(mock(CacheProvider.class), mock(ParticipantMessagePublisher.class),
listener);
var compositionId = UUID.randomUUID();
var list = List.of(new AutomationCompositionElementDefinition());
- ach.prime(compositionId, list);
- verify(listener).prime(compositionId, list);
+ var messageId = UUID.randomUUID();
+ ach.prime(messageId, compositionId, list);
+ verify(listener).prime(messageId, compositionId, list);
}
@Test
- void handleComposiotDeprimeTest() throws PfModelException {
- var listener = mock(AutomationCompositionElementListener.class);
+ void handleComposiotDeprimeTest() {
+ var listener = mock(ThreadHandler.class);
var ach = new AutomationCompositionHandler(mock(CacheProvider.class), mock(ParticipantMessagePublisher.class),
listener);
var compositionId = UUID.randomUUID();
- ach.deprime(compositionId);
- verify(listener).deprime(compositionId);
+ var messageId = UUID.randomUUID();
+ ach.deprime(messageId, compositionId);
+ verify(listener).deprime(messageId, compositionId);
}
}
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
index 895d4ed..237cab2 100644
--- a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandlerTest.java
@@ -158,17 +158,20 @@
void handleParticipantPrimeTest() {
var cacheProvider = mock(CacheProvider.class);
when(cacheProvider.getParticipantId()).thenReturn(CommonTestData.getParticipantId());
+
+ var participantPrime = new ParticipantPrime();
+ participantPrime.setCompositionId(UUID.randomUUID());
+ participantPrime.setMessageId(UUID.randomUUID());
+ participantPrime.setParticipantDefinitionUpdates(List.of(createParticipantDefinition()));
+
var publisher = mock(ParticipantMessagePublisher.class);
var acHandler = mock(AutomationCompositionHandler.class);
var participantHandler = new ParticipantHandler(acHandler, mock(AutomationCompositionOutHandler.class),
publisher, cacheProvider);
- var participantPrime = new ParticipantPrime();
- participantPrime.setCompositionId(UUID.randomUUID());
- participantPrime.setParticipantDefinitionUpdates(List.of(createParticipantDefinition()));
participantHandler.handleParticipantPrime(participantPrime);
verify(cacheProvider).addElementDefinition(any(), any());
- verify(acHandler).prime(any(), any());
+ verify(acHandler).prime(any(), any(), any());
}
@Test
@@ -182,9 +185,11 @@
var participantPrime = new ParticipantPrime();
var compositionId = UUID.randomUUID();
participantPrime.setCompositionId(compositionId);
+ var messageId = UUID.randomUUID();
+ participantPrime.setMessageId(messageId);
participantHandler.handleParticipantPrime(participantPrime);
verify(cacheProvider).removeElementDefinition(compositionId);
- verify(acHandler).deprime(compositionId);
+ verify(acHandler).deprime(messageId, compositionId);
}
@Test
diff --git a/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java
new file mode 100644
index 0000000..767a916
--- /dev/null
+++ b/participant/participant-intermediary/src/test/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ThreadHandlerTest.java
@@ -0,0 +1,160 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.handler;
+
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.ws.rs.core.Response.Status;
+import org.junit.jupiter.api.Test;
+import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
+import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
+import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
+import org.onap.policy.clamp.models.acm.concepts.AcTypeState;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElementDefinition;
+import org.onap.policy.clamp.models.acm.concepts.DeployState;
+import org.onap.policy.clamp.models.acm.concepts.LockState;
+import org.onap.policy.clamp.models.acm.concepts.StateChangeResult;
+import org.onap.policy.models.base.PfModelException;
+
+class ThreadHandlerTest {
+
+ private static final int TIMEOUT = 400;
+
+ @Test
+ void test() throws PfModelException {
+ var listener = mock(AutomationCompositionElementListener.class);
+ var intermediaryApi = mock(ParticipantIntermediaryApi.class);
+ var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class));
+
+ var compositionId = UUID.randomUUID();
+ var list = List.of(new AutomationCompositionElementDefinition());
+ var messageId = UUID.randomUUID();
+ threadHandler.prime(messageId, compositionId, list);
+ verify(listener, timeout(TIMEOUT)).prime(compositionId, list);
+
+ clearInvocations(listener);
+ var element = new AcElementDeploy();
+ var elementId = UUID.randomUUID();
+ element.setId(elementId);
+ Map<String, Object> properties = Map.of("key", "value");
+ var instanceId = UUID.randomUUID();
+ threadHandler.deploy(messageId, instanceId, element, properties);
+ verify(listener, timeout(TIMEOUT)).deploy(instanceId, element, properties);
+
+ clearInvocations(listener);
+ threadHandler.update(messageId, instanceId, element, properties);
+ verify(listener, timeout(TIMEOUT)).update(instanceId, element, properties);
+
+ clearInvocations(listener);
+ threadHandler.lock(messageId, instanceId, elementId);
+ verify(listener, timeout(TIMEOUT)).lock(instanceId, elementId);
+
+ clearInvocations(listener);
+ threadHandler.unlock(messageId, instanceId, elementId);
+ verify(listener, timeout(TIMEOUT)).unlock(instanceId, elementId);
+
+ clearInvocations(listener);
+ threadHandler.undeploy(messageId, instanceId, elementId);
+ verify(listener, timeout(TIMEOUT)).undeploy(instanceId, elementId);
+
+ clearInvocations(listener);
+ threadHandler.delete(messageId, instanceId, elementId);
+ verify(listener, timeout(TIMEOUT)).delete(instanceId, elementId);
+
+ clearInvocations(listener);
+ threadHandler.deprime(messageId, compositionId);
+ verify(listener, timeout(TIMEOUT)).deprime(compositionId);
+ }
+
+ @Test
+ void testException() throws PfModelException {
+ var listener = mock(AutomationCompositionElementListener.class);
+ var intermediaryApi = mock(ParticipantIntermediaryApi.class);
+ var threadHandler = new ThreadHandler(listener, intermediaryApi, mock(CacheProvider.class));
+
+ var compositionId = UUID.randomUUID();
+ var list = List.of(new AutomationCompositionElementDefinition());
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).prime(compositionId, list);
+ var messageId = UUID.randomUUID();
+ threadHandler.prime(messageId, compositionId, list);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId, AcTypeState.COMMISSIONED,
+ StateChangeResult.FAILED, "Composition Defintion prime failed");
+
+ clearInvocations(intermediaryApi);
+ var element = new AcElementDeploy();
+ var elementId = UUID.randomUUID();
+ element.setId(elementId);
+ Map<String, Object> properties = Map.of("key", "value");
+ var instanceId = UUID.randomUUID();
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).deploy(instanceId, element,
+ properties);
+ threadHandler.deploy(messageId, instanceId, element, properties);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+ DeployState.UNDEPLOYED, null, StateChangeResult.FAILED, "Automation composition element deploy failed");
+
+ clearInvocations(listener);
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).update(instanceId, element,
+ properties);
+ threadHandler.update(messageId, instanceId, element, properties);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+ DeployState.DEPLOYED, null, StateChangeResult.FAILED, "Automation composition element update failed");
+
+ clearInvocations(listener);
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).lock(instanceId, elementId);
+ threadHandler.lock(messageId, instanceId, elementId);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, null,
+ LockState.UNLOCKED, StateChangeResult.FAILED, "Automation composition element lock failed");
+
+ clearInvocations(listener);
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).unlock(instanceId,
+ elementId);
+ threadHandler.unlock(messageId, instanceId, elementId);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId, null,
+ LockState.LOCKED, StateChangeResult.FAILED, "Automation composition element unlock failed");
+
+ clearInvocations(listener);
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).undeploy(instanceId,
+ elementId);
+ threadHandler.undeploy(messageId, instanceId, elementId);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+ DeployState.DEPLOYED, null, StateChangeResult.FAILED, "Automation composition element undeploy failed");
+
+ clearInvocations(listener);
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).delete(instanceId,
+ elementId);
+ threadHandler.delete(messageId, instanceId, elementId);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateAutomationCompositionElementState(instanceId, elementId,
+ DeployState.UNDEPLOYED, null, StateChangeResult.FAILED, "Automation composition element delete failed");
+
+ clearInvocations(listener);
+ doThrow(new PfModelException(Status.INTERNAL_SERVER_ERROR, "Error")).when(listener).deprime(compositionId);
+ threadHandler.deprime(messageId, compositionId);
+ verify(intermediaryApi, timeout(TIMEOUT)).updateCompositionState(compositionId,
+ AcTypeState.PRIMED, StateChangeResult.FAILED, "Composition Defintion deprime failed");
+ }
+}