Add ParticipantReplica repository in ACM

Add ParticipantReplica repository in ACM,
and align topics in properties files.

Issue-ID: POLICY-5045
Change-Id: I283abf91db6264c7b08c51e6ad37736dca147180
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/AcElementRestart.java b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/AcElementRestart.java
index 3f0dff2..3d1b5df 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/AcElementRestart.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/concepts/AcElementRestart.java
@@ -44,6 +44,8 @@
     @NonNull
     private ToscaConceptIdentifier definition = new ToscaConceptIdentifier(PfConceptKey.getNullKey());
 
+    private UUID participantId;
+
     // State of the AutomationCompositionElement
     private DeployState deployState;
 
@@ -69,6 +71,7 @@
     public AcElementRestart(final AcElementRestart otherElement) {
         this.id = otherElement.id;
         this.definition = new ToscaConceptIdentifier(otherElement.definition);
+        this.participantId = otherElement.participantId;
         this.deployState = otherElement.deployState;
         this.lockState = otherElement.lockState;
         this.operationalState = otherElement.operationalState;
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
index 119cdf0..98c7d10 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantRestart.java
@@ -66,6 +66,7 @@
      */
     public ParticipantRestart(ParticipantRestart source) {
         super(source);
+        this.state = source.state;
         this.participantDefinitionUpdates =
                 PfUtils.mapList(source.participantDefinitionUpdates, ParticipantDefinition::new);
         this.automationcompositionList = PfUtils.mapList(source.automationcompositionList, ParticipantRestartAc::new);
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
index 33a7309..962b613 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/kafka/participant/ParticipantSync.java
@@ -20,6 +20,9 @@
 
 package org.onap.policy.clamp.models.acm.messages.kafka.participant;
 
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
@@ -29,6 +32,10 @@
 @ToString(callSuper = true)
 public class ParticipantSync extends ParticipantRestart {
 
+    private Set<UUID> excludeReplicas = new HashSet<>();
+    private boolean restarting = false;
+    private boolean delete = false;
+
     /**
      * Constructor.
      */
@@ -43,5 +50,8 @@
      */
     public ParticipantSync(ParticipantSync source) {
         super(source);
+        this.excludeReplicas = new HashSet<>(source.excludeReplicas);
+        this.restarting = source.restarting;
+        this.delete = source.delete;
     }
 }
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProvider.java b/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProvider.java
index 3b80aca..b3437c0 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProvider.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProvider.java
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
 
 package org.onap.policy.clamp.models.acm.persistence.provider;
 
+import jakarta.ws.rs.core.Response;
 import jakarta.ws.rs.core.Response.Status;
 import java.util.HashMap;
 import java.util.List;
@@ -33,9 +34,13 @@
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionElement;
 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantReplica;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.persistence.concepts.JpaParticipant;
+import org.onap.policy.clamp.models.acm.persistence.concepts.JpaParticipantReplica;
 import org.onap.policy.clamp.models.acm.persistence.repository.AutomationCompositionElementRepository;
 import org.onap.policy.clamp.models.acm.persistence.repository.NodeTemplateStateRepository;
+import org.onap.policy.clamp.models.acm.persistence.repository.ParticipantReplicaRepository;
 import org.onap.policy.clamp.models.acm.persistence.repository.ParticipantRepository;
 import org.onap.policy.models.base.PfModelRuntimeException;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@@ -56,6 +61,7 @@
 
     private final NodeTemplateStateRepository nodeTemplateStateRepository;
 
+    private final ParticipantReplicaRepository replicaRepository;
 
     /**
      * Get all participants.
@@ -142,7 +148,6 @@
         return jpaDeleteParticipantOpt.get().toAuthorative();
     }
 
-
     /**
      * Get a map with SupportedElement as key and the participantId as value.
      *
@@ -160,7 +165,6 @@
         return map;
     }
 
-
     /**
      * Retrieve a list of automation composition elements associated with a participantId.
      *
@@ -186,11 +190,64 @@
     /**
      * Get a list of compositionId associated with a participantId from ac definitions.
      * @param participantId the participant id associated with the automation composition elements
-     * @return the list of compositionId
+     * @return the set of compositionId
      */
     public Set<UUID> getCompositionIds(@NonNull final UUID participantId) {
         return nodeTemplateStateRepository.findByParticipantId(participantId.toString()).stream()
                 .map(nodeTemplateState -> UUID.fromString(nodeTemplateState.getCompositionId()))
                 .collect(Collectors.toSet());
     }
+
+    /**
+     * Get participant replica.
+     *
+     * @param replicaId the Id of the replica to get
+     * @return the replica found
+     */
+    @Transactional(readOnly = true)
+    public Optional<ParticipantReplica> findParticipantReplica(@NonNull final UUID replicaId) {
+        return replicaRepository.findById(replicaId.toString()).map(JpaParticipantReplica::toAuthorative);
+    }
+
+    /**
+     * Save participant replica.
+     *
+     * @param replica replica to save
+     */
+    public void saveParticipantReplica(@NonNull final ParticipantReplica replica) {
+        var jpa = replicaRepository.getReferenceById(replica.getReplicaId().toString());
+        jpa.fromAuthorative(replica);
+        replicaRepository.save(jpa);
+    }
+
+    /**
+     * Delete participant replica.
+     *
+     * @param replicaId the Id of the replica to delete
+     */
+    public void deleteParticipantReplica(@NonNull UUID replicaId) {
+        replicaRepository.deleteById(replicaId.toString());
+    }
+
+    public List<ParticipantReplica> findReplicasOnLine() {
+        return ProviderUtils.asEntityList(replicaRepository.findByParticipantState(ParticipantState.ON_LINE));
+    }
+
+    /**
+     * Verify Participant state.
+     *
+     * @param participantIds The list of UUIDs of the participants to get
+     * @throws  PfModelRuntimeException in case the participant is offline
+     */
+    public void verifyParticipantState(Set<UUID> participantIds) {
+        for (UUID participantId : participantIds) {
+            var jpaParticipant = participantRepository.getReferenceById(participantId.toString());
+            var replicaOnline = jpaParticipant.getReplicas().stream()
+                    .filter(replica -> ParticipantState.ON_LINE.equals(replica.getParticipantState())).findFirst();
+            if (replicaOnline.isEmpty()) {
+                throw new PfModelRuntimeException(Response.Status.CONFLICT,
+                        "Participant: " + participantId + " is OFFLINE");
+            }
+        }
+    }
 }
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/repository/ParticipantReplicaRepository.java b/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/repository/ParticipantReplicaRepository.java
new file mode 100644
index 0000000..c9621ee
--- /dev/null
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/persistence/repository/ParticipantReplicaRepository.java
@@ -0,0 +1,33 @@
+/*-
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.persistence.repository;
+
+import java.util.List;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
+import org.onap.policy.clamp.models.acm.persistence.concepts.JpaParticipantReplica;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+@Repository
+public interface ParticipantReplicaRepository extends JpaRepository<JpaParticipantReplica, String> {
+
+    List<JpaParticipantReplica> findByParticipantState(ParticipantState participantState);
+}
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java b/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java
index 5f523ad..f19d5db 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/utils/AcmUtils.java
@@ -441,6 +441,7 @@
         var acElementRestart = new AcElementRestart();
         acElementRestart.setId(element.getId());
         acElementRestart.setDefinition(new ToscaConceptIdentifier(element.getDefinition()));
+        acElementRestart.setParticipantId(element.getParticipantId());
         acElementRestart.setDeployState(element.getDeployState());
         acElementRestart.setLockState(element.getLockState());
         acElementRestart.setOperationalState(element.getOperationalState());
diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProviderTest.java b/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProviderTest.java
index 2a5c15a..9ceeef6 100644
--- a/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProviderTest.java
+++ b/models/src/test/java/org/onap/policy/clamp/models/acm/persistence/provider/ParticipantProviderTest.java
@@ -1,6 +1,6 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,11 +26,14 @@
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -38,12 +41,15 @@
 import org.onap.policy.clamp.models.acm.concepts.AutomationCompositions;
 import org.onap.policy.clamp.models.acm.concepts.NodeTemplateState;
 import org.onap.policy.clamp.models.acm.concepts.Participant;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantState;
 import org.onap.policy.clamp.models.acm.persistence.concepts.JpaAutomationComposition;
 import org.onap.policy.clamp.models.acm.persistence.concepts.JpaNodeTemplateState;
 import org.onap.policy.clamp.models.acm.persistence.concepts.JpaParticipant;
 import org.onap.policy.clamp.models.acm.persistence.repository.AutomationCompositionElementRepository;
 import org.onap.policy.clamp.models.acm.persistence.repository.NodeTemplateStateRepository;
+import org.onap.policy.clamp.models.acm.persistence.repository.ParticipantReplicaRepository;
 import org.onap.policy.clamp.models.acm.persistence.repository.ParticipantRepository;
+import org.onap.policy.clamp.models.acm.utils.CommonTestData;
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.resources.ResourceUtils;
@@ -63,27 +69,24 @@
 
     private final List<Participant> inputParticipants = new ArrayList<>();
     private List<JpaParticipant> jpaParticipantList;
-    private final String originalJson = ResourceUtils.getResourceAsString(PARTICIPANT_JSON);
-
-    private AutomationCompositions inputAutomationCompositions;
     private List<JpaAutomationComposition> inputAutomationCompositionsJpa;
 
-    private final String originalAcJson = ResourceUtils.getResourceAsString(AUTOMATION_COMPOSITION_JSON);
-    private final String nodeTemplateStatesJson = ResourceUtils.getResourceAsString(NODE_TEMPLATE_STATE_JSON);
-
-    private List<NodeTemplateState> nodeTemplateStateList = new ArrayList<>();
+    private final List<NodeTemplateState> nodeTemplateStateList = new ArrayList<>();
     private List<JpaNodeTemplateState> jpaNodeTemplateStateList;
 
     @BeforeEach
     void beforeSetup() throws Exception {
+        var originalJson = ResourceUtils.getResourceAsString(PARTICIPANT_JSON);
         inputParticipants.add(CODER.decode(originalJson, Participant.class));
         jpaParticipantList = ProviderUtils.getJpaAndValidateList(inputParticipants, JpaParticipant::new, "participant");
 
-        inputAutomationCompositions = CODER.decode(originalAcJson, AutomationCompositions.class);
+        var originalAcJson = ResourceUtils.getResourceAsString(AUTOMATION_COMPOSITION_JSON);
+        var inputAutomationCompositions = CODER.decode(originalAcJson, AutomationCompositions.class);
         inputAutomationCompositionsJpa =
             ProviderUtils.getJpaAndValidateList(inputAutomationCompositions.getAutomationCompositionList(),
                 JpaAutomationComposition::new, "automation compositions");
 
+        var nodeTemplateStatesJson = ResourceUtils.getResourceAsString(NODE_TEMPLATE_STATE_JSON);
         nodeTemplateStateList.add(CODER.decode(nodeTemplateStatesJson, NodeTemplateState.class));
         nodeTemplateStateList.get(0).setState(AcTypeState.COMMISSIONED);
         jpaNodeTemplateStateList = ProviderUtils.getJpaAndValidateList(nodeTemplateStateList,
@@ -97,7 +100,8 @@
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
 
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         assertThatThrownBy(() -> participantProvider.saveParticipant(null)).hasMessageMatching(LIST_IS_NULL);
 
@@ -116,7 +120,8 @@
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
 
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         assertThatThrownBy(() -> participantProvider.updateParticipant(null)).hasMessageMatching(LIST_IS_NULL);
 
@@ -133,7 +138,8 @@
         var automationCompositionElementRepository = mock(AutomationCompositionElementRepository.class);
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         assertThat(participantProvider.findParticipant(INVALID_ID)).isEmpty();
 
@@ -156,7 +162,8 @@
         var automationCompositionElementRepository = mock(AutomationCompositionElementRepository.class);
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         assertThatThrownBy(() -> participantProvider.getParticipantById(INVALID_ID)).isInstanceOf(
             PfModelRuntimeException.class).hasMessageMatching("Participant Not Found with ID:.*.");
@@ -168,7 +175,8 @@
         var automationCompositionElementRepository = mock(AutomationCompositionElementRepository.class);
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         var participantId = inputParticipants.get(0).getParticipantId();
         assertThatThrownBy(() -> participantProvider.deleteParticipant(participantId))
@@ -187,13 +195,14 @@
         var automationCompositionElementRepository = mock(AutomationCompositionElementRepository.class);
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         var acElementList = inputAutomationCompositionsJpa.get(0).getElements();
 
         var participantId = UUID.randomUUID();
         when(automationCompositionElementRepository.findByParticipantId(participantId.toString()))
-                .thenReturn(acElementList);
+            .thenReturn(acElementList);
 
         var listOfAcElements = participantProvider.getAutomationCompositionElements(participantId);
 
@@ -210,7 +219,8 @@
         when(nodeTemplateStateRepository.findByParticipantId(participantId)).thenReturn(jpaNodeTemplateStateList);
 
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         var listOfNodeTemplateState = participantProvider.getAcNodeTemplateStates(UUID.fromString(participantId));
 
@@ -224,7 +234,8 @@
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
 
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         assertThrows(NullPointerException.class, () -> participantProvider.getParticipantById(null));
         assertThrows(NullPointerException.class, () -> participantProvider.findParticipant(null));
@@ -233,6 +244,9 @@
         assertThrows(NullPointerException.class, () -> participantProvider.deleteParticipant(null));
         assertThrows(NullPointerException.class, () -> participantProvider.getAutomationCompositionElements(null));
         assertThrows(NullPointerException.class, () -> participantProvider.getAcNodeTemplateStates(null));
+        assertThrows(NullPointerException.class, () -> participantProvider.findParticipantReplica(null));
+        assertThrows(NullPointerException.class, () -> participantProvider.saveParticipantReplica(null));
+        assertThrows(NullPointerException.class, () -> participantProvider.deleteParticipantReplica(null));
     }
 
     @Test
@@ -242,7 +256,8 @@
         var nodeTemplateStateRepository = mock(NodeTemplateStateRepository.class);
         when(participantRepository.findAll()).thenReturn(jpaParticipantList);
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         var result = participantProvider.getSupportedElementMap();
         assertThat(result).hasSize(2);
@@ -258,11 +273,89 @@
         var automationCompositionElementRepository = mock(AutomationCompositionElementRepository.class);
 
         var participantProvider = new ParticipantProvider(participantRepository,
-            automationCompositionElementRepository, nodeTemplateStateRepository);
+            automationCompositionElementRepository, nodeTemplateStateRepository,
+            mock(ParticipantReplicaRepository.class));
 
         assertThatThrownBy(() -> participantProvider.getCompositionIds(null)).hasMessageMatching(LIST_IS_NULL);
 
         var result = participantProvider.getCompositionIds(participantId);
         assertThat(result).hasSize(1);
     }
+
+    @Test
+    void testFindParticipantReplica() {
+        var replicaRepository = mock(ParticipantReplicaRepository.class);
+        var replica = inputParticipants.get(0).getReplicas().values().iterator().next();
+        var jpaReplica = jpaParticipantList.get(0).getReplicas().get(0);
+        when(replicaRepository.findById(replica.getReplicaId().toString())).thenReturn(Optional.of(jpaReplica));
+        var participantProvider = new ParticipantProvider(mock(ParticipantRepository.class),
+                mock(AutomationCompositionElementRepository.class), mock(NodeTemplateStateRepository.class),
+                replicaRepository);
+
+        var result = participantProvider.findParticipantReplica(replica.getReplicaId());
+        assertThat(result).isNotEmpty();
+        assertEquals(replica.getReplicaId(), result.get().getReplicaId());
+    }
+
+    @Test
+    void testFindReplicasOnLine() {
+        var replicaRepository = mock(ParticipantReplicaRepository.class);
+        var replica = inputParticipants.get(0).getReplicas().values().iterator().next();
+        var jpaReplica = jpaParticipantList.get(0).getReplicas().get(0);
+        jpaReplica.fromAuthorative(replica);
+        when(replicaRepository.findByParticipantState(ParticipantState.ON_LINE)).thenReturn(List.of(jpaReplica));
+        var participantProvider = new ParticipantProvider(mock(ParticipantRepository.class),
+                mock(AutomationCompositionElementRepository.class), mock(NodeTemplateStateRepository.class),
+                replicaRepository);
+
+        var result = participantProvider.findReplicasOnLine();
+        assertThat(result).hasSize(1);
+        assertEquals(replica.getReplicaId(), result.get(0).getReplicaId());
+    }
+
+    @Test
+    void testSaveParticipantReplica() {
+        var jpaReplica = jpaParticipantList.get(0).getReplicas().get(0);
+        var replicaRepository = mock(ParticipantReplicaRepository.class);
+        when(replicaRepository.getReferenceById(jpaReplica.getReplicaId())).thenReturn(jpaReplica);
+        var participantProvider = new ParticipantProvider(mock(ParticipantRepository.class),
+                mock(AutomationCompositionElementRepository.class), mock(NodeTemplateStateRepository.class),
+                replicaRepository);
+
+        var replica = inputParticipants.get(0).getReplicas().values().iterator().next();
+        participantProvider.saveParticipantReplica(replica);
+        verify(replicaRepository).save(any());
+    }
+
+    @Test
+    void testDeleteParticipantReplica() {
+        var replicaRepository = mock(ParticipantReplicaRepository.class);
+        var participantProvider = new ParticipantProvider(mock(ParticipantRepository.class),
+                mock(AutomationCompositionElementRepository.class), mock(NodeTemplateStateRepository.class),
+                replicaRepository);
+        participantProvider.deleteParticipantReplica(CommonTestData.getReplicaId());
+        verify(replicaRepository).deleteById(CommonTestData.getReplicaId().toString());
+    }
+
+    @Test
+    void testVerifyParticipantState() {
+        var jpaParticipant = new JpaParticipant(jpaParticipantList.get(0));
+        var participantId = jpaParticipant.getParticipantId();
+        var participantRepository = mock(ParticipantRepository.class);
+        when(participantRepository.getReferenceById(participantId)).thenReturn(jpaParticipant);
+
+        var replicaRepository = mock(ParticipantReplicaRepository.class);
+        var participantProvider = new ParticipantProvider(participantRepository,
+                mock(AutomationCompositionElementRepository.class), mock(NodeTemplateStateRepository.class),
+                replicaRepository);
+
+        jpaParticipant.setReplicas(List.of());
+        var set = Set.of(UUID.fromString(participantId));
+        assertThatThrownBy(() -> participantProvider.verifyParticipantState(set))
+                .hasMessageMatching("Participant: " + participantId + " is OFFLINE");
+
+        when(participantRepository.getReferenceById(participantId)).thenReturn(jpaParticipantList.get(0));
+        participantProvider.verifyParticipantState(set);
+        verify(participantRepository, times(2)).getReferenceById(participantId);
+    }
 }
diff --git a/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java b/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java
index 3bd1549..1d111b8 100644
--- a/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java
+++ b/models/src/test/java/org/onap/policy/clamp/models/acm/utils/CommonTestData.java
@@ -37,7 +37,7 @@
  */
 public class CommonTestData {
 
-    public static final UUID PARTCICIPANT_ID = UUID.randomUUID();
+    public static final UUID PARTICIPANT_ID = UUID.randomUUID();
     public static final UUID REPLICA_ID = UUID.randomUUID();
     private static final StandardYamlCoder YAML_TRANSLATOR = new StandardYamlCoder();
 
@@ -47,7 +47,7 @@
      * @return participant Id
      */
     public static UUID getParticipantId() {
-        return PARTCICIPANT_ID;
+        return PARTICIPANT_ID;
     }
 
     /**
@@ -65,7 +65,7 @@
      * @return participant Id
      */
     public static String getJpaParticipantId() {
-        return PARTCICIPANT_ID.toString();
+        return PARTICIPANT_ID.toString();
     }
 
     /**
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/A1pmsParticipantParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/A1pmsParticipantParameters.yaml
index dcb1552..01fd439 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/A1pmsParticipantParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/A1pmsParticipantParameters.yaml
@@ -23,18 +23,26 @@
 
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
@@ -52,4 +60,4 @@
 server:
   port: 8086
   servlet:
-    context-path: /onap/policy/clamp/acm/a1pmsparticipant
\ No newline at end of file
+    context-path: /onap/policy/clamp/acm/a1pmsparticipant
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/AcRuntimeParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/AcRuntimeParameters.yaml
index 302fe21..b8723ae 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/AcRuntimeParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/AcRuntimeParameters.yaml
@@ -38,13 +38,16 @@
     path: /error
 
 runtime:
+  topics:
+    operationTopic: policy-acruntime-participant
+    syncTopic: acm-ppnt-sync
   participantParameters:
     heartBeatMs: 20000
     maxStatusWaitMs: 200000
   topicParameterGroup:
     topicSources:
       -
-        topic: policy-acruntime-participant
+        topic: ${runtime.topics.operationTopic}
         servers:
           - ${topicServer:kafka:9092}
         topicCommInfrastructure: NOOP
@@ -52,11 +55,20 @@
         useHttps: true
     topicSinks:
       -
-        topic: policy-acruntime-participant
+        topic: ${runtime.topics.operationTopic}
         servers:
           - ${topicServer:kafka:9092}
         topicCommInfrastructure: NOOP
         useHttps: true
+      -
+        topic: ${runtime.topics.syncTopic}
+        servers:
+          - ${topicServer:kafka:9092}
+        topicCommInfrastructure: NOOP
+        useHttps: true
+  acmParameters:
+    toscaElementName: org.onap.policy.clamp.acm.AutomationCompositionElement
+    toscaCompositionName: org.onap.policy.clamp.acm.AutomationComposition
 
 management:
   endpoints:
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/HttpParticipantParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/HttpParticipantParameters.yaml
index d301409..58105ad 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/HttpParticipantParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/HttpParticipantParameters.yaml
@@ -14,19 +14,28 @@
   enable-csrf: false
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+          useHttps: true
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
           useHttps: true
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/KserveParticipantParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/KserveParticipantParameters.yaml
index 6be44c1..79756b7 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/KserveParticipantParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/KserveParticipantParameters.yaml
@@ -21,21 +21,33 @@
 
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
+    participantSupportedElementTypes:
+      -
+        typeName: org.onap.policy.clamp.acm.KserveAutomationCompositionElement
+        typeVersion: 1.0.1
 
 customresourcedefinition:
   group: serving.kserve.io
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/KubernetesParticipantParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/KubernetesParticipantParameters.yaml
index b175873..1bab3b7 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/KubernetesParticipantParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/KubernetesParticipantParameters.yaml
@@ -17,13 +17,23 @@
   localChartDirectory: /home/policy/local-charts
   infoFileName: CHART_INFO.json
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c02
     clampAutomationCompositionTopics:
       topicSources:
         -
-          topic: policy-acruntime-participant
+          topic: ${participant.intermediaryParameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+          useHttps: true
+        -
+          topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
@@ -31,7 +41,7 @@
           useHttps: true
       topicSinks:
         -
-          topic: policy-acruntime-participant
+          topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/PolicyParticipantParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/PolicyParticipantParameters.yaml
index 5f76818..1b85548 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/PolicyParticipantParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/PolicyParticipantParameters.yaml
@@ -30,13 +30,23 @@
     useHttps: true
     allowSelfSignedCerts: true
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c03
     clampAutomationCompositionTopics:
       topicSources:
         -
-          topic: policy-acruntime-participant
+          topic: ${participant.intermediaryParameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+          useHttps: true
+        -
+          topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
@@ -44,7 +54,7 @@
           useHttps: true
       topicSinks:
         -
-          topic: policy-acruntime-participant
+          topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml b/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml
index d1b3cbb..962dcf6 100644
--- a/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml
+++ b/packages/policy-clamp-tarball/src/main/resources/etc/SimulatorParticipantParameters.yaml
@@ -14,19 +14,28 @@
   enable-csrf: false
 participant:
   intermediaryParameters:
+    topics:
+      operationTopic: policy-acruntime-participant
+      syncTopic: acm-ppnt-sync
     reportingTimeIntervalMs: 120000
     description: Participant Description
     participantId: ${participantId:101c62b3-8918-41b9-a747-d21eb79c6c90}
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
+          servers:
+            - ${topicServer:kafka:9092}
+          topicCommInfrastructure: NOOP
+          fetchTimeout: 15000
+          useHttps: ${useHttps:true}
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
           useHttps: ${useHttps:true}
       topicSinks:
-        - topic: policy-acruntime-participant
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
index 18ffde6..95b0781 100644
--- a/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-a1pms/src/main/resources/config/application.yaml
@@ -31,18 +31,18 @@
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c00
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
-        - topic: ${participant.intermediaryparameters.topics.syncTopic}
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
index 9e86d49..539d30f 100644
--- a/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml
@@ -22,18 +22,18 @@
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c01
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
-        - topic: ${participant.intermediaryparameters.topics.syncTopic}
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
index 6ccd2fc..29efba3 100644
--- a/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-kserve/src/main/resources/config/application.yaml
@@ -30,18 +30,18 @@
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c04
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
-        - topic: ${participant.intermediaryparameters.topics.syncTopic}
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
index d66faee..8991a49 100644
--- a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml
@@ -31,18 +31,18 @@
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c02
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
-        - topic: ${participant.intermediaryparameters.topics.syncTopic}
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
index 7a0ef8d..9411dff 100644
--- a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml
@@ -38,18 +38,18 @@
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c03
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
-        - topic: ${participant.intermediaryparameters.topics.syncTopic}
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
diff --git a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
index 77a3ef4..3f4d794 100644
--- a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
+++ b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml
@@ -22,18 +22,18 @@
     participantId: 101c62b3-8918-41b9-a747-d21eb79c6c90
     clampAutomationCompositionTopics:
       topicSources:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
-        - topic: ${participant.intermediaryparameters.topics.syncTopic}
+        - topic: ${participant.intermediaryParameters.topics.syncTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP
           fetchTimeout: 15000
       topicSinks:
-        - topic: ${participant.intermediaryparameters.topics.operationTopic}
+        - topic: ${participant.intermediaryParameters.topics.operationTopic}
           servers:
             - ${topicServer:kafka:9092}
           topicCommInfrastructure: NOOP