Changed recovery so that policies will be reconfigured in the RIC after RIC restart

Instead of just removing all policies by a recovery, all policies are
recreated in the RIC.

Change-Id: Id9bd6f954ee7e156bbf6084e585da7827b89f830
Issue-ID: NONRTRIC-84
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
index 246fdd4..affce2c 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
@@ -145,8 +145,8 @@
         @RequestParam(name = "instance", required = true) String id) {
         Policy policy = policies.get(id);
         if (policy != null && policy.ric().state().equals(Ric.RicState.IDLE)) {
+            policies.remove(policy);
             return a1Client.deletePolicy(policy.ric().getConfig().baseUrl(), id) //
-                .doOnEach(notUsed -> policies.removeId(id)) //
                 .flatMap(notUsed -> {
                     return Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT));
                 });
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
index fb41e26..8b3fadb 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicRecoveryTask.java
@@ -20,7 +20,6 @@
 
 package org.oransc.policyagent.tasks;
 
-import java.util.Collection;
 import java.util.Vector;
 
 import org.oransc.policyagent.clients.A1Client;
@@ -32,7 +31,6 @@
 import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
-import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.repository.Service;
 import org.oransc.policyagent.repository.Services;
 import org.slf4j.Logger;
@@ -42,7 +40,11 @@
 import reactor.core.publisher.Mono;
 
 /**
- * Loads information about RealTime-RICs at startup.
+ * Recovery handling of RIC, which means:
+ * - load all policy types
+ * - send all policy instances to the RIC
+ * --- if that fails remove all policy instances
+ * - Notify subscribing services
  */
 public class RicRecoveryTask {
 
@@ -60,14 +62,6 @@
         this.services = services;
     }
 
-    public void run(Rics rics) {
-        synchronized (rics) {
-            for (Ric ric : rics.getRics()) {
-                run(ric);
-            }
-        }
-    }
-
     public void run(Ric ric) {
         logger.debug("Handling ric: {}", ric.getConfig().name());
 
@@ -77,16 +71,17 @@
             }
             ric.setState(Ric.RicState.RECOVERING);
         }
-        Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
-        Flux<?> deletedPolicies = deletePolicies(ric);
+        Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric);
+        Flux<?> deletePoliciesInRic = deleteAllPoliciesInRic(ric);
+        Flux<?> recreatePoliciesInRic = recreateAllPoliciesInRic(ric);
 
-        Flux.merge(recoveredTypes, deletedPolicies) //
+        Flux.concat(recoverTypes, deletePoliciesInRic, recreatePoliciesInRic) //
             .subscribe(x -> logger.debug("Recover: " + x), //
-                throwable -> onError(ric, throwable), //
-                () -> onComplete(ric));
+                throwable -> onRecoveryError(ric, throwable), //
+                () -> onRecoveryComplete(ric));
     }
 
-    private void onComplete(Ric ric) {
+    private void onRecoveryComplete(Ric ric) {
         logger.debug("Recovery completed for:" + ric.name());
         ric.setState(Ric.RicState.IDLE);
         notifyAllServices("Recovery completed for:" + ric.name());
@@ -107,8 +102,22 @@
         }
     }
 
-    private void onError(Ric ric, Throwable t) {
+    private void onRecoveryError(Ric ric, Throwable t) {
         logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+
+        // If recovery fails, try to remove all instances
+        deleteAllPolicies(ric);
+        Flux<PolicyType> recoverTypes = recoverPolicyTypes(ric);
+        Flux<?> deletePoliciesInRic = deleteAllPoliciesInRic(ric);
+
+        Flux.merge(recoverTypes, deletePoliciesInRic) //
+            .subscribe(x -> logger.debug("Brute recover: " + x), //
+                throwable -> onRemoveAllError(ric, throwable), //
+                () -> onRecoveryComplete(ric));
+    }
+
+    private void onRemoveAllError(Ric ric, Throwable t) {
+        logger.warn("Remove all failed for: {}, reason: {}", ric.name(), t.getMessage());
         ric.setState(Ric.RicState.UNDEFINED);
     }
 
@@ -143,17 +152,28 @@
         return Mono.just(pt);
     }
 
-    private Flux<String> deletePolicies(Ric ric) {
+    private void deleteAllPolicies(Ric ric) {
         synchronized (policies) {
-            Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
-            for (Policy policy : ricPolicies) {
+            for (Policy policy : policies.getForRic(ric.name())) {
                 this.policies.remove(policy);
             }
         }
+    }
 
+    private Flux<String> deleteAllPoliciesInRic(Ric ric) {
         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
             .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
             .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
             .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
     }
+
+    private Flux<String> recreateAllPoliciesInRic(Ric ric) {
+        synchronized (policies) {
+            return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
+                .doOnNext(
+                    policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
+                .flatMap(policy -> a1Client.putPolicy(policy));
+        }
+    }
+
 }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
index d78155d..d077017 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
@@ -51,6 +51,7 @@
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.tasks.RepositorySupervision;
 import org.oransc.policyagent.utils.MockA1Client;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -79,6 +80,12 @@
     @Autowired
     private PolicyTypes policyTypes;
 
+    @Autowired
+    MockA1Client a1Client;
+
+    @Autowired
+    RepositorySupervision supervision;
+
     private static Gson gson = new GsonBuilder() //
         .serializeNulls() //
         .create(); //
@@ -96,8 +103,6 @@
      */
     @TestConfiguration
     static class TestBeanFactory {
-        private final Rics rics = new Rics();
-        private final Policies policies = new Policies();
         private final PolicyTypes policyTypes = new PolicyTypes();
 
         @Bean
@@ -112,7 +117,7 @@
 
         @Bean
         public Policies getPolicies() {
-            return this.policies;
+            return new Policies();
         }
 
         @Bean
@@ -122,7 +127,7 @@
 
         @Bean
         public Rics getRics() {
-            return this.rics;
+            return new Rics();
         }
     }
 
@@ -153,6 +158,22 @@
     }
 
     @Test
+    public void testRecovery() throws Exception {
+        reset();
+        Policy policy = addPolicy("policyId", "typeName", "service", "ric"); // This should be created in the RIC
+
+        Policy policy2 = addPolicy("policyId2", "typeName", "service", "ric");
+        a1Client.putPolicy("ric", policy2); // put it in the RIC
+        policies.remove(policy2); // Remove it from the repo -> should be deleted in the RIC
+
+        supervision.checkAllRics(); // The created policy should be put in the RIC
+        Policies ricPolicies = a1Client.getPolicies("ric");
+        assertThat(ricPolicies.size()).isEqualTo(1);
+        Policy ricPolicy = ricPolicies.get("policyId");
+        assertThat(ricPolicy.json()).isEqualTo(policy.json());
+    }
+
+    @Test
     public void testGetRic() throws Exception {
         reset();
         String url = baseUrl() + "/ric?managedElementId=kista_1";
@@ -201,7 +222,7 @@
         Vector<String> mes = new Vector<>();
         RicConfig conf = ImmutableRicConfig.builder() //
             .name(ricName) //
-            .baseUrl("baseUrl") //
+            .baseUrl(ricName) //
             .managedElementIds(mes) //
             .build();
         Ric ric = new Ric(conf);
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
index ecb4661..8b488d8 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
@@ -50,13 +50,11 @@
 public class MockPolicyAgent {
 
     static class MockApplicationConfig extends ApplicationConfig {
-
         @Override
         protected String getLocalConfigurationFilePath() {
             URL url = MockApplicationConfig.class.getClassLoader().getResource("test_application_configuration.json");
             return url.getFile();
         }
-
     }
 
     /**
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
index b1f397a..1330a14 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
@@ -21,6 +21,7 @@
 package org.oransc.policyagent.tasks;
 
 import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -105,6 +106,7 @@
         when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.empty());
         when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyIds);
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("schema"));
+        when(a1ClientMock.putPolicy(any())).thenReturn(Mono.just("OK"));
 
         supervisorUnderTest.checkAllRics();
 
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
index 729fc7b..e4fd6e6 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/StartupServiceTest.java
@@ -142,14 +142,9 @@
 
     @Test
     public void startup_unableToConnectToGetTypes() {
-        Mono<Collection<String>> policyIdentities = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
         Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
-        doReturn(error, policyIdentities).when(a1ClientMock).getPolicyTypeIdentities(anyString());
-
-        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
-        doReturn(policies).when(a1ClientMock).getPolicyIdentities(anyString());
-        when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
-        when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
+        doReturn(error, error).when(a1ClientMock).getPolicyTypeIdentities(anyString());
+        doReturn(error).when(a1ClientMock).getPolicyIdentities(anyString());
 
         Rics rics = new Rics();
         PolicyTypes policyTypes = new PolicyTypes();
@@ -159,48 +154,28 @@
         serviceUnderTest.startup();
         serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
             ApplicationConfig.RicConfigUpdate.ADDED);
-        serviceUnderTest.onRicConfigUpdate(
-            getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
-            ApplicationConfig.RicConfigUpdate.ADDED);
-
-        verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
-        verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
 
         assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
-
-        assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
     }
 
     @Test
     public void startup_unableToConnectToGetPolicies() {
 
-        Mono<Collection<String>> policyTypes1 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
-        Mono<Collection<String>> policyTypes2 = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME, POLICY_TYPE_2_NAME));
-        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes1).thenReturn(policyTypes2);
+        Mono<Collection<String>> policyTypes = Mono.just(Arrays.asList(POLICY_TYPE_1_NAME));
+        when(a1ClientMock.getPolicyTypeIdentities(anyString())).thenReturn(policyTypes);
         when(a1ClientMock.getPolicyType(anyString(), anyString())).thenReturn(Mono.just("Schema"));
-        Mono<Collection<String>> policies = Mono.just(Arrays.asList(POLICY_ID_1, POLICY_ID_2));
-        doReturn(Mono.error(new Exception("Unable to contact ric.")), policies).when(a1ClientMock)
-            .getPolicyIdentities(anyString());
-        when(a1ClientMock.deletePolicy(anyString(), anyString())).thenReturn(Mono.just("OK"));
+        Mono<?> error = Mono.error(new Exception("Unable to contact ric."));
+        doReturn(error).when(a1ClientMock).getPolicyIdentities(anyString());
 
         Rics rics = new Rics();
-        PolicyTypes policyTypes = new PolicyTypes();
         StartupService serviceUnderTest =
-            new StartupService(appConfigMock, rics, policyTypes, a1ClientMock, new Policies(), new Services());
+            new StartupService(appConfigMock, rics, new PolicyTypes(), a1ClientMock, new Policies(), new Services());
 
         serviceUnderTest.startup();
         serviceUnderTest.onRicConfigUpdate(getRicConfig(FIRST_RIC_NAME, FIRST_RIC_URL, MANAGED_NODE_A),
             ApplicationConfig.RicConfigUpdate.ADDED);
-        serviceUnderTest.onRicConfigUpdate(
-            getRicConfig(SECOND_RIC_NAME, SECOND_RIC_URL, MANAGED_NODE_B, MANAGED_NODE_C),
-            ApplicationConfig.RicConfigUpdate.ADDED);
-
-        verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_1);
-        verify(a1ClientMock).deletePolicy(SECOND_RIC_URL, POLICY_ID_2);
 
         assertEquals(RicState.UNDEFINED, rics.get(FIRST_RIC_NAME).state(), "Not correct state for " + FIRST_RIC_NAME);
-
-        assertEquals(IDLE, rics.get(SECOND_RIC_NAME).state(), "Not correct state for " + SECOND_RIC_NAME);
     }
 
     @SafeVarargs
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
index aca29f5..bac2739 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
@@ -86,11 +86,15 @@
         return Mono.just("OK");
     }
 
-    private Policies getPolicies(String url) {
+    public Policies getPolicies(String url) {
         if (!policies.containsKey(url)) {
             policies.put(url, new Policies());
         }
         return policies.get(url);
     }
 
+    public void putPolicy(String url, Policy policy) {
+        getPolicies(url).put(policy);
+    }
+
 }