Bugfix in RIC synchronization

Concurrent modification exception when allpolicies for a RIC was deleted.

Fixed MockA1Client so it can simulate network delays

Change-Id: I1883b42a7afac303770084625a1e45acbf89c28f
Issue-ID: NONRTRIC-164
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
index 6110c58..3879fd6 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
@@ -22,6 +22,8 @@
 
 import static org.oransc.policyagent.repository.Ric.RicState;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Vector;
 
 import org.oransc.policyagent.clients.A1Client;
@@ -40,6 +42,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.BaseSubscriber;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -89,15 +92,23 @@
             .flatMap(Lock::unlock) //
             .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
             .flatMapMany(client -> startSynchronization(ric, client)) //
-            .subscribe(x -> logger.debug("Synchronize: {}", x), //
-                throwable -> onSynchronizationError(ric, throwable), //
-                () -> onSynchronizationComplete(ric));
+            .subscribe(new BaseSubscriber<Object>() {
+                @Override
+                protected void hookOnError(Throwable throwable) {
+                    startDeleteAllPolicyInstances(ric, throwable);
+                }
+
+                @Override
+                protected void hookOnComplete() {
+                    onSynchronizationComplete(ric);
+                }
+            });
     }
 
     private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
         Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
         Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
-        Flux<?> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+        Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
 
         return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
     }
@@ -115,7 +126,7 @@
                 if (service.getCallbackUrl().length() > 0) {
                     createNotificationClient(url) //
                         .put("", body) //
-                        .subscribe(
+                        .subscribe( //
                             notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
                                 .warn("Service notification failed for service: {}", service.getName(), throwable),
                             () -> logger.debug("All services notified"));
@@ -124,7 +135,7 @@
         }
     }
 
-    private void onSynchronizationError(Ric ric, Throwable t) {
+    private void startDeleteAllPolicyInstances(Ric ric, Throwable t) {
         logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
         // If synchronization fails, try to remove all instances
         deleteAllPoliciesInRepository(ric);
@@ -137,11 +148,11 @@
 
         Flux.concat(synchronizedTypes, deletePoliciesInRic) //
             .subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
-                throwable -> onSynchronizationRecoveryError(ric, throwable), //
+                throwable -> onDeleteAllPolicyInstancesError(ric, throwable), //
                 () -> onSynchronizationComplete(ric));
     }
 
-    private void onSynchronizationRecoveryError(Ric ric, Throwable t) {
+    private void onDeleteAllPolicyInstancesError(Ric ric, Throwable t) {
         logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
         ric.setState(RicState.UNDEFINED);
     }
@@ -175,18 +186,23 @@
 
     private void deleteAllPoliciesInRepository(Ric ric) {
         synchronized (policies) {
-            for (Policy policy : policies.getForRic(ric.name())) {
+            List<Policy> ricPolicies = new ArrayList<>(policies.getForRic(ric.name()));
+            for (Policy policy : ricPolicies) {
                 this.policies.remove(policy);
             }
         }
     }
 
-    private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
+    private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
+        logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name());
+        return a1Client.putPolicy(policy) //
+            .flatMapMany(notUsed -> Flux.just(policy));
+    }
+
+    private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
         synchronized (policies) {
             return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
-                .doOnNext(
-                    policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
-                .flatMap(a1Client::putPolicy);
+                .flatMap(policy -> putPolicy(policy, ric, a1Client));
         }
     }
 
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
index af8ecd4..3265d76 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent.utils;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Vector;
 
@@ -31,13 +32,16 @@
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
 
 public class MockA1Client implements A1Client {
     Policies policies = new Policies();
     private final PolicyTypes policyTypes;
+    private final Duration asynchDelay;
 
-    public MockA1Client(PolicyTypes policyTypes) {
+    public MockA1Client(PolicyTypes policyTypes, Duration asynchDelay) {
         this.policyTypes = policyTypes;
+        this.asynchDelay = asynchDelay;
     }
 
     @Override
@@ -47,7 +51,7 @@
             for (PolicyType p : this.policyTypes.getAll()) {
                 result.add(p.name());
             }
-            return Mono.just(result);
+            return mono(result);
         }
     }
 
@@ -59,14 +63,14 @@
                 result.add(policy.id());
             }
 
-            return Mono.just(result);
+            return mono(result);
         }
     }
 
     @Override
     public Mono<String> getPolicyTypeSchema(String policyTypeId) {
         try {
-            return Mono.just(this.policyTypes.getType(policyTypeId).schema());
+            return mono(this.policyTypes.getType(policyTypeId).schema());
         } catch (Exception e) {
             return Mono.error(e);
         }
@@ -75,13 +79,14 @@
     @Override
     public Mono<String> putPolicy(Policy p) {
         this.policies.put(p);
-        return Mono.just("OK");
+        return mono("OK");
+
     }
 
     @Override
     public Mono<String> deletePolicy(Policy policy) {
         this.policies.remove(policy);
-        return Mono.just("OK");
+        return mono("OK");
     }
 
     public Policies getPolicies() {
@@ -90,18 +95,44 @@
 
     @Override
     public Mono<A1ProtocolType> getProtocolVersion() {
-        return Mono.just(A1ProtocolType.STD_V1_1);
+        return mono(A1ProtocolType.STD_V1_1);
     }
 
     @Override
     public Flux<String> deleteAllPolicies() {
         this.policies.clear();
-        return Flux.empty();
+        return mono("OK") //
+            .flatMapMany(Flux::just);
     }
 
     @Override
     public Mono<String> getPolicyStatus(Policy policy) {
-        return Mono.just("OK");
+        return mono("OK");
+    }
+
+    private <T> Mono<T> mono(T value) {
+        if (this.asynchDelay.isZero()) {
+            return Mono.just(value);
+        } else {
+            return Mono.create(monoSink -> asynchResponse(monoSink, value));
+        }
+    }
+
+    @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+    private void sleep() {
+        try {
+            Thread.sleep(this.asynchDelay.toMillis());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private <T> void asynchResponse(MonoSink<T> callback, T str) {
+        Thread thread = new Thread(() -> {
+            sleep(); // Simulate a network delay
+            callback.success(str);
+        });
+        thread.start();
     }
 
 }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
index 52682fa..c1fd8c3 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
@@ -24,6 +24,7 @@
 import static org.mockito.Mockito.spy;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,6 +41,7 @@
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final Map<String, MockA1Client> clients = new HashMap<>();
     private final PolicyTypes policyTypes;
+    private Duration asynchDelay = Duration.ofSeconds(0);
 
     public MockA1ClientFactory(PolicyTypes policyTypes) {
         super(mock(ApplicationConfig.class));
@@ -54,10 +56,20 @@
     public MockA1Client getOrCreateA1Client(String ricName) {
         if (!clients.containsKey(ricName)) {
             logger.debug("Creating client for RIC: {}", ricName);
-            MockA1Client client = spy(new MockA1Client(policyTypes));
+            MockA1Client client = spy(new MockA1Client(policyTypes, asynchDelay));
             clients.put(ricName, client);
         }
         return clients.get(ricName);
     }
 
+    /**
+     * Simulate network latency. The REST responses will be generated by separate
+     * threads
+     * 
+     * @param delay the delay between the request and the response
+     */
+    public void setResponseDelay(Duration delay) {
+        this.asynchDelay = delay;
+    }
+
 }