Merge "Changed recovery so that policies will be reconfigured in the RIC after RIC restart"
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
index 246fdd4..affce2c 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
@@ -145,8 +145,8 @@
@RequestParam(name = "instance", required = true) String id) {
Policy policy = policies.get(id);
if (policy != null && policy.ric().state().equals(Ric.RicState.IDLE)) {
+ policies.remove(policy);
return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
- .doOnEach(notUsed -> policies.removeId(id)) //
.flatMap(notUsed -> {
return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT));
});
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
index fb41e26..8b3fadb 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
@@ -20,7 +20,6 @@
package org.oransc.policyagent.tasks;
-import java.util.Collection;
import java.util.Vector;
import org.oransc.policyagent.clients.A1Client;
@@ -32,7 +31,6 @@
import org.oransc.policyagent.repository.PolicyType;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Service;
import org.oransc.policyagent.repository.Services;
import org.slf4j.Logger;
@@ -42,7 +40,11 @@
import reactor.core.publisher.Mono;
/**
- * Loads information about RealTime-RICs at startup.
+ * Recovery handling of RIC, which means:
+ * - load all policy types
+ * - send all policy instances to the RIC
+ * --- if that fails remove all policy instances
+ * - Notify subscribing services
*/
public class RicRecoveryTask {
@@ -60,14 +62,6 @@
this.services = services;
}
- public void run(Rics rics) {
- synchronized (rics) {
- for (Ric ric : rics.getRics()) {
- run(ric);
- }
- }
- }
-
public void run(Ric ric) {
logger.debug("Handling ric: {}", ric.getConfig().name());
@@ -77,16 +71,17 @@
}
ric.setState(Ric.RicState.RECOVERING);
}
- Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
- Flux<?> deletedPolicies = deletePolicies(ric);
+ Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric);
+ Flux<?> deletePoliciesInRic = deleteAllPoliciesInRic(ric);
+ Flux<?> recreatePoliciesInRic = recreateAllPoliciesInRic(ric);
- Flux.merge(recoveredTypes, deletedPolicies) //
+ Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic) //
.subscribe(x -> logger.debug("Recover: " + x), //
- throwable -> onError(ric, throwable), //
- () -> onComplete(ric));
+ throwable -> onRecoveryError(ric, throwable), //
+ () -> onRecoveryComplete(ric));
}
- private void onComplete(Ric ric) {
+ private void onRecoveryComplete(Ric ric) {
logger.debug("Recovery completed for:" + ric.name());
ric.setState(Ric.RicState.IDLE);
notifyAllServices("Recovery completed for:" + ric.name());
@@ -107,8 +102,22 @@
}
}
- private void onError(Ric ric, Throwable t) {
+ private void onRecoveryError(Ric ric, Throwable t) {
logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+
+ // If recovery fails, try to remove all instances
+ deleteAllPolicies(ric);
+ Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric);
+ Flux<?> deletePoliciesInRic = deleteAllPoliciesInRic(ric);
+
+ Flux.merge(recoverTypes, deletePoliciesInRic) //
+ .subscribe(x -> logger.debug("Brute recover: " + x), //
+ throwable -> onRemoveAllError(ric, throwable), //
+ () -> onRecoveryComplete(ric));
+ }
+
+ private void onRemoveAllError(Ric ric, Throwable t) {
+ logger.warn("Remove all failed for: {}, reason: {}", ric.name(), t.getMessage());
ric.setState(Ric.RicState.UNDEFINED);
}
@@ -143,17 +152,28 @@
return Mono.just(pt);
}
- private Flux<String> deletePolicies(Ric ric) {
+ private void deleteAllPolicies(Ric ric) {
synchronized (policies) {
- Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
- for (Policy policy : ricPolicies) {
+ for (Policy policy : policies.getForRic(ric.name())) {
this.policies.remove(policy);
}
}
+ }
+ private Flux<String> deleteAllPoliciesInRic(Ric ric) {
return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
.flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
.doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
.flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
}
+
+ private Flux<String> recreateAllPoliciesInRic(Ric ric) {
+ synchronized (policies) {
+ return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
+ .doOnNext(
+ policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
+ .flatMap(policy -> a1Client.putPolicy(policy));
+ }
+ }
+
}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
index d78155d..d077017 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
@@ -51,6 +51,7 @@
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.tasks.RepositorySupervision;
import org.oransc.policyagent.utils.MockA1Client;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@@ -79,6 +80,12 @@
@Autowired
private PolicyTypes policyTypes;
+ @Autowired
+ MockA1Client a1Client;
+
+ @Autowired
+ RepositorySupervision supervision;
+
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
@@ -96,8 +103,6 @@
*/
@TestConfiguration
static class TestBeanFactory {
- private final Rics rics = new Rics();
- private final Policies policies = new Policies();
private final PolicyTypes policyTypes = new PolicyTypes();
@Bean
@@ -112,7 +117,7 @@
@Bean
public Policies getPolicies() {
- return this.policies;
+ return new Policies();
}
@Bean
@@ -122,7 +127,7 @@
@Bean
public Rics getRics() {
- return this.rics;
+ return new Rics();
}
}
@@ -153,6 +158,22 @@
}
@Test
+ public void testRecovery() throws Exception {
+ reset();
+ Policy policy = addPolicy("policyId", "typeName", "service", "ric"); // This should be created in the RIC
+
+ Policy policy2 = addPolicy("policyId2", "typeName", "service", "ric");
+ a1Client.putPolicy("ric", policy2); // put it in the RIC
+ policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC
+
+ supervision.checkAllRics(); // The created policy should be put in the RIC
+ Policies ricPolicies = a1Client.getPolicies("ric");
+ assertThat(ricPolicies.size()).isEqualTo(1);
+ Policy ricPolicy = ricPolicies.get("policyId");
+ assertThat(ricPolicy.json()).isEqualTo(policy.json());
+ }
+
+ @Test
public void testGetRic() throws Exception {
reset();
String url = baseUrl() + "/ric?managedElementId=kista_1";
@@ -201,7 +222,7 @@
Vector<String> mes = new Vector<>();
RicConfig conf = ImmutableRicConfig.builder() //
.name(ricName) //
- .baseUrl("baseUrl") //
+ .baseUrl(ricName) //
.managedElementIds(mes) //
.build();
Ric ric = new Ric(conf);
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
index ecb4661..8b488d8 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
@@ -50,13 +50,11 @@
public class MockPolicyAgent {
static class MockApplicationConfig extends ApplicationConfig {
-
@Override
protected String getLocalConfigurationFilePath() {
URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json");
return url.getFile();
}
-
}
/**
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
index b1f397a..1330a14 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
@@ -21,6 +21,7 @@
package org.oransc.policyagent.tasks;
import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -105,6 +106,7 @@
when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty());
when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds);
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema"));
+ when(a1ClientMock.putPolicy(any())).thenReturn(Mono.just("OK"));
supervisorUnderTest.checkAllRics();
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
index 729fc7b..e4fd6e6 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
@@ -142,14 +142,9 @@
@Test
public void startup_unableToConnectToGetTypes() {
- Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
- doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
-
- Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
- doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString());
- when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
- when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
+ doReturn(error, error).when(a1ClientMock).getPolicyTypeIdentities(anyString());
+ doReturn(error).when(a1ClientMock).getPolicyIdentities(anyString());
Rics rics = new Rics();
PolicyTypes policyTypes = new PolicyTypes();
@@ -159,48 +154,28 @@
serviceUnderTest.startup();
serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
ApplicationConfig.RicConfigUpdate.ADDED);
- serviceUnderTest.onRicConfigUpdate(
- getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
- ApplicationConfig.RicConfigUpdate.ADDED);
-
- verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
- verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
-
- assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
}
@Test
public void startup_unableToConnectToGetPolicies() {
- Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
- Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
- when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
+ Mono<Collection<String>> policyTypes = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+ when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes);
when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
- Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
- doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
- .getPolicyIdentities(anyString());
- when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
+ Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
+ doReturn(error).when(a1ClientMock).getPolicyIdentities(anyString());
Rics rics = new Rics();
- PolicyTypes policyTypes = new PolicyTypes();
StartupService serviceUnderTest =
- new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services());
+ new StartupService(appConfigMock, rics, new PolicyTypes(), a1ClientMock, new Policies(), new Services());
serviceUnderTest.startup();
serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
ApplicationConfig.RicConfigUpdate.ADDED);
- serviceUnderTest.onRicConfigUpdate(
- getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
- ApplicationConfig.RicConfigUpdate.ADDED);
-
- verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
- verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
-
- assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
}
@SafeVarargs
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
index aca29f5..bac2739 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
@@ -86,11 +86,15 @@
return Mono.just("OK");
}
- private Policies getPolicies(String url) {
+ public Policies getPolicies(String url) {
if (!policies.containsKey(url)) {
policies.put(url, new Policies());
}
return policies.get(url);
}
+ public void putPolicy(String url, Policy policy) {
+ getPolicies(url).put(policy);
+ }
+
}