Bugfix in RIC synchronization
Concurrent modification exception when allpolicies for a RIC was deleted.
Fixed MockA1Client so it can simulate network delays
Change-Id: I1883b42a7afac303770084625a1e45acbf89c28f
Issue-ID: NONRTRIC-164
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
index 6110c58..3879fd6 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
@@ -22,6 +22,8 @@
import static org.oransc.policyagent.repository.Ric.RicState;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Vector;
import org.oransc.policyagent.clients.A1Client;
@@ -40,6 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -89,15 +92,23 @@
.flatMap(Lock::unlock) //
.flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
.flatMapMany(client -> startSynchronization(ric, client)) //
- .subscribe(x -> logger.debug("Synchronize: {}", x), //
- throwable -> onSynchronizationError(ric, throwable), //
- () -> onSynchronizationComplete(ric));
+ .subscribe(new BaseSubscriber<Object>() {
+ @Override
+ protected void hookOnError(Throwable throwable) {
+ startDeleteAllPolicyInstances(ric, throwable);
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ onSynchronizationComplete(ric);
+ }
+ });
}
private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
- Flux<?> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+ Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
}
@@ -115,7 +126,7 @@
if (service.getCallbackUrl().length() > 0) {
createNotificationClient(url) //
.put("", body) //
- .subscribe(
+ .subscribe( //
notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
.warn("Service notification failed for service: {}", service.getName(), throwable),
() -> logger.debug("All services notified"));
@@ -124,7 +135,7 @@
}
}
- private void onSynchronizationError(Ric ric, Throwable t) {
+ private void startDeleteAllPolicyInstances(Ric ric, Throwable t) {
logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
// If synchronization fails, try to remove all instances
deleteAllPoliciesInRepository(ric);
@@ -137,11 +148,11 @@
Flux.concat(synchronizedTypes, deletePoliciesInRic) //
.subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
- throwable -> onSynchronizationRecoveryError(ric, throwable), //
+ throwable -> onDeleteAllPolicyInstancesError(ric, throwable), //
() -> onSynchronizationComplete(ric));
}
- private void onSynchronizationRecoveryError(Ric ric, Throwable t) {
+ private void onDeleteAllPolicyInstancesError(Ric ric, Throwable t) {
logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
ric.setState(RicState.UNDEFINED);
}
@@ -175,18 +186,23 @@
private void deleteAllPoliciesInRepository(Ric ric) {
synchronized (policies) {
- for (Policy policy : policies.getForRic(ric.name())) {
+ List<Policy> ricPolicies = new ArrayList<>(policies.getForRic(ric.name()));
+ for (Policy policy : ricPolicies) {
this.policies.remove(policy);
}
}
}
- private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
+ private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
+ logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name());
+ return a1Client.putPolicy(policy) //
+ .flatMapMany(notUsed -> Flux.just(policy));
+ }
+
+ private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
synchronized (policies) {
return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
- .doOnNext(
- policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
- .flatMap(a1Client::putPolicy);
+ .flatMap(policy -> putPolicy(policy, ric, a1Client));
}
}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
index af8ecd4..3265d76 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
@@ -20,6 +20,7 @@
package org.oransc.policyagent.utils;
+import java.time.Duration;
import java.util.List;
import java.util.Vector;
@@ -31,13 +32,16 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
public class MockA1Client implements A1Client {
Policies policies = new Policies();
private final PolicyTypes policyTypes;
+ private final Duration asynchDelay;
- public MockA1Client(PolicyTypes policyTypes) {
+ public MockA1Client(PolicyTypes policyTypes, Duration asynchDelay) {
this.policyTypes = policyTypes;
+ this.asynchDelay = asynchDelay;
}
@Override
@@ -47,7 +51,7 @@
for (PolicyType p : this.policyTypes.getAll()) {
result.add(p.name());
}
- return Mono.just(result);
+ return mono(result);
}
}
@@ -59,14 +63,14 @@
result.add(policy.id());
}
- return Mono.just(result);
+ return mono(result);
}
}
@Override
public Mono<String> getPolicyTypeSchema(String policyTypeId) {
try {
- return Mono.just(this.policyTypes.getType(policyTypeId).schema());
+ return mono(this.policyTypes.getType(policyTypeId).schema());
} catch (Exception e) {
return Mono.error(e);
}
@@ -75,13 +79,14 @@
@Override
public Mono<String> putPolicy(Policy p) {
this.policies.put(p);
- return Mono.just("OK");
+ return mono("OK");
+
}
@Override
public Mono<String> deletePolicy(Policy policy) {
this.policies.remove(policy);
- return Mono.just("OK");
+ return mono("OK");
}
public Policies getPolicies() {
@@ -90,18 +95,44 @@
@Override
public Mono<A1ProtocolType> getProtocolVersion() {
- return Mono.just(A1ProtocolType.STD_V1_1);
+ return mono(A1ProtocolType.STD_V1_1);
}
@Override
public Flux<String> deleteAllPolicies() {
this.policies.clear();
- return Flux.empty();
+ return mono("OK") //
+ .flatMapMany(Flux::just);
}
@Override
public Mono<String> getPolicyStatus(Policy policy) {
- return Mono.just("OK");
+ return mono("OK");
+ }
+
+ private <T> Mono<T> mono(T value) {
+ if (this.asynchDelay.isZero()) {
+ return Mono.just(value);
+ } else {
+ return Mono.create(monoSink -> asynchResponse(monoSink, value));
+ }
+ }
+
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ private void sleep() {
+ try {
+ Thread.sleep(this.asynchDelay.toMillis());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private <T> void asynchResponse(MonoSink<T> callback, T str) {
+ Thread thread = new Thread(() -> {
+ sleep(); // Simulate a network delay
+ callback.success(str);
+ });
+ thread.start();
}
}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
index 52682fa..c1fd8c3 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
@@ -24,6 +24,7 @@
import static org.mockito.Mockito.spy;
import java.lang.invoke.MethodHandles;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -40,6 +41,7 @@
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, MockA1Client> clients = new HashMap<>();
private final PolicyTypes policyTypes;
+ private Duration asynchDelay = Duration.ofSeconds(0);
public MockA1ClientFactory(PolicyTypes policyTypes) {
super(mock(ApplicationConfig.class));
@@ -54,10 +56,20 @@
public MockA1Client getOrCreateA1Client(String ricName) {
if (!clients.containsKey(ricName)) {
logger.debug("Creating client for RIC: {}", ricName);
- MockA1Client client = spy(new MockA1Client(policyTypes));
+ MockA1Client client = spy(new MockA1Client(policyTypes, asynchDelay));
clients.put(ricName, client);
}
return clients.get(ricName);
}
+ /**
+ * Simulate network latency. The REST responses will be generated by separate
+ * threads
+ *
+ * @param delay the delay between the request and the response
+ */
+ public void setResponseDelay(Duration delay) {
+ this.asynchDelay = delay;
+ }
+
}