Bugfix, only one RIC was synched
In RicSupervision, when recovery was started, cheching of the other ones was
stopped.
Other improvements such as:
Not starting consistency check when it is already started
Improved tracing
Change-Id: I82d1d48a091de8f24ebfa60b7cc1140e81d959f6
Issue-ID: NONRTRIC-164
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java
index def2b30..ed94492 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Lock.java
@@ -130,7 +130,8 @@
@Override
public String toString() {
- return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive;
+ return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
+ + this.lockRequestQueue.size();
}
/** returns the current number of granted locks */
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
index 07d2cda..fb2e4b9 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Ric.java
@@ -152,6 +152,11 @@
/**
* The agent is synchronizing the view of the Ric.
*/
- SYNCHRONIZING
+ SYNCHRONIZING,
+
+ /**
+ * A consistency check between the agent and the Ric is done
+ */
+ CONSISTENCY_CHECK
}
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
index 66cecd3..7faa376 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
@@ -20,6 +20,7 @@
package org.oransc.policyagent.repository;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -37,7 +38,7 @@
registeredRics.put(ric.name(), ric);
}
- public synchronized Iterable<Ric> getRics() {
+ public synchronized Collection<Ric> getRics() {
return new Vector<>(registeredRics.values());
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
index 2666d60..77be8b3 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
@@ -24,6 +24,7 @@
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.PolicyTypes;
@@ -58,6 +59,14 @@
private final A1ClientFactory a1ClientFactory;
private final Services services;
+ private static class SynchStartedException extends ServiceException {
+ private static final long serialVersionUID = 1L;
+
+ public SynchStartedException(String message) {
+ super(message);
+ }
+ }
+
@Autowired
public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services) {
@@ -80,20 +89,48 @@
private Flux<RicData> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
- .flatMap(this::checkOneRic) //
- .onErrorResume(throwable -> Mono.empty());
+ .flatMap(this::checkOneRic);
}
private Mono<RicData> checkOneRic(RicData ricData) {
return checkRicState(ricData) //
.flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+ .flatMap(notUsed -> setRicState(ricData)) //
.flatMap(x -> checkRicPolicies(ricData)) //
- .flatMap(x -> ricData.ric.getLock().unlock()) //
- .doOnError(throwable -> ricData.ric.getLock().unlockBlocking()) //
.flatMap(x -> checkRicPolicyTypes(ricData)) //
- .doOnNext(x -> logger.debug("Ric: {} checked OK", ricData.ric.name())) //
- .doOnError(t -> logger.debug("Ric: {} check Failed, exception: {}", ricData.ric.name(), t.getMessage()));
+ .doOnNext(x -> onRicCheckedOk(ricData)) //
+ .doOnError(t -> onRicCheckedError(t, ricData)) //
+ .onErrorResume(throwable -> Mono.empty());
+ }
+
+ private void onRicCheckedError(Throwable t, RicData ricData) {
+ logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.name(), t.getMessage());
+ if (t instanceof SynchStartedException) {
+ // this is just a temporary state,
+ ricData.ric.setState(RicState.AVAILABLE);
+ } else {
+ ricData.ric.setState(RicState.UNAVAILABLE);
+ }
+ ricData.ric.getLock().unlockBlocking();
+ }
+
+ private void onRicCheckedOk(RicData ricData) {
+ logger.debug("Ric: {} checked OK", ricData.ric.name());
+ ricData.ric.setState(RicState.AVAILABLE);
+ ricData.ric.getLock().unlockBlocking();
+ }
+
+ @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+ private Mono<RicData> setRicState(RicData ric) {
+ synchronized (ric) {
+ if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
+ logger.debug("Ric: {} is already being checked", ric.ric.getConfig().name());
+ return Mono.empty();
+ }
+ ric.ric.setState(RicState.CONSISTENCY_CHECK);
+ return Mono.just(ric);
+ }
}
private static class RicData {
@@ -102,8 +139,12 @@
this.a1Client = a1Client;
}
+ A1Client getClient() {
+ return a1Client;
+ }
+
final Ric ric;
- final A1Client a1Client;
+ private final A1Client a1Client;
}
private Mono<RicData> createRicData(Ric ric) {
@@ -116,7 +157,7 @@
if (ric.ric.getState() == RicState.UNAVAILABLE) {
return startSynchronization(ric) //
.onErrorResume(t -> Mono.empty());
- } else if (ric.ric.getState() == RicState.SYNCHRONIZING) {
+ } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
return Mono.empty();
} else {
return Mono.just(ric);
@@ -124,7 +165,7 @@
}
private Mono<RicData> checkRicPolicies(RicData ric) {
- return ric.a1Client.getPolicyIdentities() //
+ return ric.getClient().getPolicyIdentities() //
.flatMap(ricP -> validateInstances(ricP, ric));
}
@@ -144,7 +185,8 @@
}
private Mono<RicData> checkRicPolicyTypes(RicData ric) {
- return ric.a1Client.getPolicyTypeIdentities() //
+
+ return ric.getClient().getPolicyTypeIdentities() //
.flatMap(ricTypes -> validateTypes(ricTypes, ric));
}
@@ -163,7 +205,7 @@
private Mono<RicData> startSynchronization(RicData ric) {
RicSynchronizationTask synchronizationTask = createSynchronizationTask();
synchronizationTask.run(ric.ric);
- return Mono.error(new Exception("Syncronization started"));
+ return Mono.error(new SynchStartedException("Syncronization started"));
}
RicSynchronizationTask createSynchronizationTask() {
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
index 4e3fd0f..6905c70 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
@@ -203,6 +203,12 @@
rsp = restClient().get(url).block();
assertThat(rsp).contains("ric2");
assertThat(rsp).doesNotContain("ric1");
+ assertThat(rsp).contains("AVAILABLE");
+
+ // All RICs
+ rsp = restClient().get("/rics").block();
+ assertThat(rsp).contains("ric2");
+ assertThat(rsp).contains("ric1");
// Non existing policy type
url = "/rics?policyType=XXXX";
@@ -211,23 +217,38 @@
@Test
public void testSynchronization() throws Exception {
- addRic("ric").setState(Ric.RicState.UNAVAILABLE);
- String ricName = "ric";
- Policy policy2 = addPolicy("policyId2", "typeName", "service", ricName);
+ // Two polictypes will be put in the NearRT RICs
+ PolicyTypes nearRtRicPolicyTypes = new PolicyTypes();
+ nearRtRicPolicyTypes.put(createPolicyType("typeName"));
+ nearRtRicPolicyTypes.put(createPolicyType("typeName2"));
+ this.a1ClientFactory.setPolicyTypes(nearRtRicPolicyTypes);
- getA1Client(ricName).putPolicy(policy2); // put it in the RIC
+ // One type and one instance added to the agent storage
+ final String ric1Name = "ric1";
+ Ric ric1 = addRic(ric1Name);
+ Policy policy2 = addPolicy("policyId2", "typeName", "service", ric1Name);
+ Ric ric2 = addRic("ric2");
+
+ getA1Client(ric1Name).putPolicy(policy2); // put it in the RIC
policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC
String policyId = "policyId";
- Policy policy = addPolicy(policyId, "typeName", "service", ricName); // This should be created in the RIC
+ Policy policy = addPolicy(policyId, "typeName", "service", ric1Name); // This should be created in the RIC
supervision.checkAllRics(); // The created policy should be put in the RIC
- await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ricName).getState()));
- await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ricName).getState()));
- Policies ricPolicies = getA1Client(ricName).getPolicies();
+ // Wait until synch is completed
+ await().untilAsserted(() -> RicState.SYNCHRONIZING.equals(rics.getRic(ric1Name).getState()));
+ await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic(ric1Name).getState()));
+ await().untilAsserted(() -> RicState.AVAILABLE.equals(rics.getRic("ric2").getState()));
+
+ Policies ricPolicies = getA1Client(ric1Name).getPolicies();
assertThat(ricPolicies.size()).isEqualTo(1);
Policy ricPolicy = ricPolicies.get(policyId);
assertThat(ricPolicy.json()).isEqualTo(policy.json());
+
+ // Both types should be in the agent storage after the synch
+ assertThat(ric1.getSupportedPolicyTypes().size()).isEqualTo(2);
+ assertThat(ric2.getSupportedPolicyTypes().size()).isEqualTo(2);
}
@Test
@@ -695,12 +716,15 @@
return a1ClientFactory.getOrCreateA1Client(ricName);
}
- private PolicyType addPolicyType(String policyTypeName, String ricName) {
- PolicyType type = ImmutablePolicyType.builder() //
+ private PolicyType createPolicyType(String policyTypeName) {
+ return ImmutablePolicyType.builder() //
.name(policyTypeName) //
.schema("{\"title\":\"" + policyTypeName + "\"}") //
.build();
+ }
+ private PolicyType addPolicyType(String policyTypeName, String ricName) {
+ PolicyType type = createPolicyType(policyTypeName);
policyTypes.put(type);
addRic(ricName).addSupportedPolicyType(type);
return type;
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
index 0a5b27b..73ca351 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
@@ -104,7 +104,6 @@
@BeforeEach
public void init() {
- doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
types.clear();
policies.clear();
rics.clear();
@@ -123,6 +122,7 @@
@Test
public void whenRicIdleAndNoChangedPoliciesOrPolicyTypes_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
@@ -144,6 +144,7 @@
@Test
public void whenRicUndefined_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.UNAVAILABLE);
rics.put(RIC_1);
@@ -161,6 +162,7 @@
@Test
public void whenRicSynchronizing_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.SYNCHRONIZING);
rics.put(RIC_1);
@@ -174,6 +176,7 @@
@Test
public void whenRicIdleAndErrorGettingPolicyIdentities_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
@@ -185,10 +188,12 @@
verify(supervisorUnderTest).checkAllRics();
verifyNoMoreInteractions(supervisorUnderTest);
+ assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
@Test
public void whenRicIdleAndNotSameAmountOfPolicies_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
rics.put(RIC_1);
@@ -211,6 +216,7 @@
@Test
public void whenRicIdleAndSameAmountOfPoliciesButNotSamePolicies_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
rics.put(RIC_1);
@@ -233,6 +239,7 @@
@Test
public void whenRicIdleAndErrorGettingPolicyTypes_thenNoSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
@@ -249,6 +256,7 @@
@Test
public void whenRicIdleAndNotSameAmountOfPolicyTypes_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
RIC_1.setState(RicState.AVAILABLE);
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
@@ -272,6 +280,7 @@
@Test
public void whenRicIdleAndSameAmountOfPolicyTypesButNotSameTypes_thenSynchronization() {
+ doReturn(Mono.just(a1ClientMock)).when(a1ClientFactory).createA1Client(any(Ric.class));
PolicyType policyType2 = ImmutablePolicyType.builder() //
.name("policyType2") //
.schema("") //
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
index 9967958..c77259c 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
@@ -40,7 +40,7 @@
public class MockA1ClientFactory extends A1ClientFactory {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, MockA1Client> clients = new HashMap<>();
- private final PolicyTypes policyTypes;
+ private PolicyTypes policyTypes;
private Duration asynchDelay = Duration.ofSeconds(0);
public MockA1ClientFactory(PolicyTypes policyTypes) {
@@ -62,6 +62,10 @@
return clients.get(ricName);
}
+ public void setPolicyTypes(PolicyTypes policyTypes) {
+ this.policyTypes = policyTypes;
+ }
+
/**
* Simulate network latency. The REST responses will be generated by separate
* threads
@@ -77,4 +81,8 @@
clients.clear();
}
+ public PolicyTypes getPolicyTypes() {
+ return this.policyTypes;
+ }
+
}