Concurrency improvements
The test for concurrency is improved so it involves RIC
synchronizations.
The principles for synchronization is simplified so that
classes in repository always returns copies of collections.
RIC syncronization is taking an exclusive lock during syncronization,
which leads to that a client will not get an HTTP error when accessing
a RIC that is synched. Instead, the client will be kept waiting until
the synch is completed.
Change-Id: I67568e8ef63b4b559a341ed8136e41119c9b7e6b
Issue-ID: NONRTRIC-164
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
index 0f3b391..5ae54ef 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
@@ -95,17 +95,15 @@
@ApiResponse(code = 200, message = "Policy schemas", response = Object.class, responseContainer = "List"), //
@ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
public ResponseEntity<String> getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) {
- synchronized (this.policyTypes) {
- if (ricName == null) {
- Collection<PolicyType> types = this.policyTypes.getAll();
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
+ return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
- } else {
- try {
- Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
- return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
- } catch (ServiceException e) {
- return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
- }
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
}
}
}
@@ -136,17 +134,15 @@
responseContainer = "List"),
@ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
public ResponseEntity<String> getPolicyTypes(@RequestParam(name = "ric", required = false) String ricName) {
- synchronized (this.policyTypes) {
- if (ricName == null) {
- Collection<PolicyType> types = this.policyTypes.getAll();
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
+ return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
- } else {
- try {
- Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
- return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
- } catch (ServiceException e) {
- return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
- }
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
}
}
}
@@ -174,19 +170,16 @@
value = { //
@ApiResponse(code = 204, message = "Policy deleted", response = Object.class),
@ApiResponse(code = 404, message = "Policy is not found", response = String.class),
- @ApiResponse(code = 423, message = "RIC is locked", response = String.class)})
+ @ApiResponse(code = 423, message = "RIC is not operational", response = String.class)})
public Mono<ResponseEntity<Object>> deletePolicy( //
@RequestParam(name = "instance", required = true) String id) {
- Policy policy;
try {
- policy = policies.getPolicy(id);
+ Policy policy = policies.getPolicy(id);
keepServiceAlive(policy.ownerServiceName());
- if (policy.ric().getState() != Ric.RicState.IDLE) {
- return Mono.just(new ResponseEntity<>("Busy, synchronizing", HttpStatus.LOCKED));
- }
Ric ric = policy.ric();
- return ric.getLock().lock(LockType.SHARED) // //
- .flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) //
+ return ric.getLock().lock(LockType.SHARED) //
+ .flatMap(notUsed -> assertRicStateIdle(ric)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(policy.ric())) //
.doOnNext(notUsed -> policies.remove(policy)) //
.flatMap(client -> client.deletePolicy(policy)) //
.doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
@@ -204,7 +197,7 @@
value = { //
@ApiResponse(code = 201, message = "Policy created", response = Object.class), //
@ApiResponse(code = 200, message = "Policy updated", response = Object.class), //
- @ApiResponse(code = 423, message = "RIC is locked", response = String.class), //
+ @ApiResponse(code = 423, message = "RIC is not operational", response = String.class), //
@ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class) //
})
public Mono<ResponseEntity<Object>> putPolicy( //
@@ -218,31 +211,30 @@
Ric ric = rics.get(ricName);
PolicyType type = policyTypes.get(typeName);
keepServiceAlive(service);
- if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) {
- Policy policy = ImmutablePolicy.builder() //
- .id(instanceId) //
- .json(jsonString) //
- .type(type) //
- .ric(ric) //
- .ownerServiceName(service) //
- .lastModified(getTimeStampUtc()) //
- .build();
-
- final boolean isCreate = this.policies.get(policy.id()) == null;
-
- return ric.getLock().lock(LockType.SHARED) //
- .flatMap(p -> validateModifiedPolicy(policy)) //
- .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
- .flatMap(client -> client.putPolicy(policy)) //
- .doOnNext(notUsed -> policies.put(policy)) //
- .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
- .doOnError(t -> ric.getLock().unlockBlocking()) //
- .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
- .onErrorResume(this::handleException);
+ if (ric == null || type == null) {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
+ Policy policy = ImmutablePolicy.builder() //
+ .id(instanceId) //
+ .json(jsonString) //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName(service) //
+ .lastModified(getTimeStampUtc()) //
+ .build();
- return ric == null || type == null ? Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND))
- : Mono.just(new ResponseEntity<>(HttpStatus.LOCKED)); // Synchronizing
+ final boolean isCreate = this.policies.get(policy.id()) == null;
+
+ return ric.getLock().lock(LockType.SHARED) //
+ .flatMap(p -> assertRicStateIdle(ric)) //
+ .flatMap(p -> validateModifiedPolicy(policy)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
+ .flatMap(client -> client.putPolicy(policy)) //
+ .doOnNext(notUsed -> policies.put(policy)) //
+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(t -> ric.getLock().unlockBlocking()) //
+ .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+ .onErrorResume(this::handleException);
}
@SuppressWarnings({"unchecked"})
@@ -275,6 +267,16 @@
return Mono.just("OK");
}
+ private Mono<Object> assertRicStateIdle(Ric ric) {
+ if (ric.getState() == Ric.RicState.IDLE) {
+ return Mono.just("OK");
+ } else {
+ RejectionException e = new RejectionException(
+ "Ric is not operational, RIC name: " + ric.name() + ", state: " + ric.getState(), HttpStatus.LOCKED);
+ return Mono.error(e);
+ }
+ }
+
@GetMapping("/policies")
@ApiOperation(value = "Query policies")
@ApiResponses(
@@ -292,10 +294,9 @@
if ((ric != null && this.rics.get(ric) == null)) {
return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
}
- synchronized (policies) {
- String filteredPolicies = policiesToJson(filter(type, ric, service));
- return new ResponseEntity<>(filteredPolicies, HttpStatus.OK);
- }
+
+ String filteredPolicies = policiesToJson(filter(type, ric, service));
+ return new ResponseEntity<>(filteredPolicies, HttpStatus.OK);
}
@GetMapping("/policy_ids")
@@ -314,10 +315,9 @@
if ((ric != null && this.rics.get(ric) == null)) {
return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
}
- synchronized (policies) {
- String policyIdsJson = toPolicyIdsJson(filter(type, ric, service));
- return new ResponseEntity<>(policyIdsJson, HttpStatus.OK);
- }
+
+ String policyIdsJson = toPolicyIdsJson(filter(type, ric, service));
+ return new ResponseEntity<>(policyIdsJson, HttpStatus.OK);
}
@GetMapping("/policy_status")
@@ -367,16 +367,14 @@
}
private Collection<Policy> filter(String type, String ric, String service) {
- synchronized (policies) {
- if (type != null) {
- return filter(policies.getForType(type), null, ric, service);
- } else if (service != null) {
- return filter(policies.getForService(service), type, ric, null);
- } else if (ric != null) {
- return filter(policies.getForRic(ric), type, null, service);
- } else {
- return policies.getAll();
- }
+ if (type != null) {
+ return filter(policies.getForType(type), null, ric, service);
+ } else if (service != null) {
+ return filter(policies.getForService(service), type, ric, null);
+ } else if (ric != null) {
+ return filter(policies.getForRic(ric), type, null, service);
+ } else {
+ return policies.getAll();
}
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
index a96766d..465ee78 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/RicRepositoryController.java
@@ -95,11 +95,9 @@
}
List<RicInfo> result = new ArrayList<>();
- synchronized (rics) {
- for (Ric ric : rics.getRics()) {
- if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
- result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames()));
- }
+ for (Ric ric : rics.getRics()) {
+ if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
+ result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames()));
}
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
index 2a34643..578e5c8 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
@@ -31,7 +31,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.Policies;
@@ -79,11 +78,9 @@
}
Collection<ServiceStatus> servicesStatus = new ArrayList<>();
- synchronized (this.services) {
- for (Service s : this.services.getAll()) {
- if (name == null || name.equals(s.getName())) {
- servicesStatus.add(toServiceStatus(s));
- }
+ for (Service s : this.services.getAll()) {
+ if (name == null || name.equals(s.getName())) {
+ servicesStatus.add(toServiceStatus(s));
}
}
@@ -157,19 +154,15 @@
}
private Service removeService(String name) throws ServiceException {
- synchronized (this.services) {
- Service service = this.services.getService(name);
- this.services.remove(service.getName());
- return service;
- }
+ Service service = this.services.getService(name); // Just to verify that it exists
+ this.services.remove(service.getName());
+ return service;
}
private void removePolicies(Service service) {
- synchronized (this.policies) {
- List<Policy> policyList = new ArrayList<>(this.policies.getForService(service.getName()));
- for (Policy policy : policyList) {
- this.policies.remove(policy);
- }
+ Collection<Policy> policyList = this.policies.getForService(service.getName());
+ for (Policy policy : policyList) {
+ this.policies.remove(policy);
}
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
index 62dd140..4e2ebfa 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Policies.java
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
@@ -60,7 +61,7 @@
if (map == null) {
return Collections.emptyList();
}
- return Collections.unmodifiableCollection(map.values());
+ return new Vector<>(map.values());
}
public synchronized boolean containsPolicy(String id) {
@@ -80,7 +81,7 @@
}
public synchronized Collection<Policy> getAll() {
- return Collections.unmodifiableCollection(policiesId.values());
+ return new Vector<>(policiesId.values());
}
public synchronized Collection<Policy> getForService(String service) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
index 7798231..2897a50 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/PolicyTypes.java
@@ -21,9 +21,9 @@
package org.oransc.policyagent.repository;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
@@ -51,7 +51,7 @@
}
public synchronized Collection<PolicyType> getAll() {
- return Collections.unmodifiableCollection(types.values());
+ return new Vector<>(types.values());
}
public synchronized int size() {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
index 3b8e587..66cecd3 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Rics.java
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
@@ -37,7 +38,7 @@
}
public synchronized Iterable<Ric> getRics() {
- return registeredRics.values();
+ return new Vector<>(registeredRics.values());
}
public synchronized Ric getRic(String name) throws ServiceException {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
index f6c55dc..f829c7c 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
@@ -20,9 +20,9 @@
package org.oransc.policyagent.repository;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Vector;
import org.oransc.policyagent.exceptions.ServiceException;
import org.slf4j.Logger;
@@ -51,7 +51,7 @@
}
public synchronized Iterable<Service> getAll() {
- return Collections.unmodifiableCollection(registeredServices.values());
+ return new Vector<>(registeredServices.values());
}
public synchronized void remove(String name) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
index 52780d7..ba050df 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
@@ -82,12 +82,11 @@
}
private Flux<RicData> createTask() {
- synchronized (this.rics) {
- return Flux.fromIterable(rics.getRics()) //
- .flatMap(this::createRicData) //
- .flatMap(this::checkOneRic) //
- .onErrorResume(throwable -> Mono.empty());
- }
+ return Flux.fromIterable(rics.getRics()) //
+ .flatMap(this::createRicData) //
+ .flatMap(this::checkOneRic) //
+ .onErrorResume(throwable -> Mono.empty());
+
}
private Mono<RicData> checkOneRic(RicData ricData) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
index 3879fd6..00ca0ed 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
@@ -22,15 +22,10 @@
import static org.oransc.policyagent.repository.Ric.RicState;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Vector;
-
import org.oransc.policyagent.clients.A1Client;
import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.repository.ImmutablePolicyType;
-import org.oransc.policyagent.repository.Lock;
import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
@@ -45,6 +40,7 @@
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
/**
* Synchronizes the content of a RIC with the content in the repository. This
@@ -76,36 +72,51 @@
this.services = services;
}
- @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
public void run(Ric ric) {
logger.debug("Handling ric: {}", ric.getConfig().name());
- synchronized (ric) {
- if (ric.getState() == RicState.SYNCHRONIZING) {
- logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
- return;
- }
- ric.setState(RicState.SYNCHRONIZING);
+ if (ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+ return;
}
- ric.getLock().lock(LockType.EXCLUSIVE) // Make sure no NBI updates are running
- .flatMap(Lock::unlock) //
+ ric.getLock().lock(LockType.EXCLUSIVE) //
+ .flatMap(notUsed -> setRicState(ric)) //
.flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
- .flatMapMany(client -> startSynchronization(ric, client)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
.subscribe(new BaseSubscriber<Object>() {
@Override
protected void hookOnError(Throwable throwable) {
- startDeleteAllPolicyInstances(ric, throwable);
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage());
+ ric.setState(RicState.UNDEFINED);
}
@Override
protected void hookOnComplete() {
onSynchronizationComplete(ric);
}
+
+ @Override
+ protected void hookFinally(SignalType type) {
+ ric.getLock().unlockBlocking();
+ }
});
}
- private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
+ @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+ private Mono<Ric> setRicState(Ric ric) {
+ synchronized (ric) {
+ if (ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+ return Mono.empty();
+ }
+ ric.setState(RicState.SYNCHRONIZING);
+ return Mono.just(ric);
+ }
+ }
+
+ private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
@@ -114,30 +125,27 @@
}
private void onSynchronizationComplete(Ric ric) {
- logger.info("Synchronization completed for: {}", ric.name());
+ logger.debug("Synchronization completed for: {}", ric.name());
ric.setState(RicState.IDLE);
notifyAllServices("Synchronization completed for:" + ric.name());
}
private void notifyAllServices(String body) {
- synchronized (services) {
- for (Service service : services.getAll()) {
- String url = service.getCallbackUrl();
- if (service.getCallbackUrl().length() > 0) {
- createNotificationClient(url) //
- .put("", body) //
- .subscribe( //
- notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
- .warn("Service notification failed for service: {}", service.getName(), throwable),
- () -> logger.debug("All services notified"));
- }
+ for (Service service : services.getAll()) {
+ String url = service.getCallbackUrl();
+ if (service.getCallbackUrl().length() > 0) {
+ createNotificationClient(url) //
+ .put("", body) //
+ .subscribe( //
+ notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
+ .warn("Service notification failed for service: {}", service.getName(), throwable),
+ () -> logger.debug("All services notified"));
}
}
}
- private void startDeleteAllPolicyInstances(Ric ric, Throwable t) {
- logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
- // If synchronization fails, try to remove all instances
+ private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
+ logger.warn("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage());
deleteAllPoliciesInRepository(ric);
Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
@@ -146,15 +154,7 @@
.flatMapMany(A1Client::deleteAllPolicies) //
.doOnComplete(() -> deleteAllPoliciesInRepository(ric));
- Flux.concat(synchronizedTypes, deletePoliciesInRic) //
- .subscribe(x -> logger.debug("Brute recovery of failed synchronization: {}", x), //
- throwable -> onDeleteAllPolicyInstancesError(ric, throwable), //
- () -> onSynchronizationComplete(ric));
- }
-
- private void onDeleteAllPolicyInstancesError(Ric ric, Throwable t) {
- logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
- ric.setState(RicState.UNDEFINED);
+ return Flux.concat(synchronizedTypes, deletePoliciesInRic);
}
AsyncRestClient createNotificationClient(final String url) {
@@ -185,11 +185,8 @@
}
private void deleteAllPoliciesInRepository(Ric ric) {
- synchronized (policies) {
- List<Policy> ricPolicies = new ArrayList<>(policies.getForRic(ric.name()));
- for (Policy policy : ricPolicies) {
- this.policies.remove(policy);
- }
+ for (Policy policy : policies.getForRic(ric.name())) {
+ this.policies.remove(policy);
}
}
@@ -200,10 +197,8 @@
}
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
- synchronized (policies) {
- return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
- .flatMap(policy -> putPolicy(policy, ric, a1Client));
- }
+ return Flux.fromIterable(policies.getForRic(ric.name())) //
+ .flatMap(policy -> putPolicy(policy, ric, a1Client));
}
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
index 2650992..50e990c 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
@@ -105,9 +105,7 @@
}
private Flux<Policy> getAllPoliciesForService(Service service) {
- synchronized (policies) {
- return Flux.fromIterable(policies.getForService(service.getName()));
- }
+ return Flux.fromIterable(policies.getForService(service.getName()));
}
private Mono<Policy> deletePolicyInRic(Policy policy) {
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
index 71bd9a5..f701ee5 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
@@ -38,7 +38,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -76,13 +75,9 @@
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
-import org.springframework.http.HttpEntity;
-import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
@@ -180,6 +175,7 @@
policies.clear();
policyTypes.clear();
services.clear();
+ a1ClientFactory.reset();
}
@AfterEach
@@ -454,7 +450,6 @@
@Test
public void testGetPolicies() throws Exception {
- reset();
addPolicy("id1", "type1", "service1");
String url = "/policies";
@@ -642,51 +637,19 @@
return "{\n \"servingCellNrcgi\": \"1\"\n }";
}
- private static class ConcurrencyTestRunnable implements Runnable {
- private final RestTemplate restTemplate = new RestTemplate();
- private final String baseUrl;
- static AtomicInteger nextCount = new AtomicInteger(0);
- private final int count;
- private final RicSupervision supervision;
-
- ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) {
- this.baseUrl = baseUrl;
- this.count = nextCount.incrementAndGet();
- this.supervision = supervision;
- }
-
- @Override
- public void run() {
- for (int i = 0; i < 100; ++i) {
- if (i % 10 == 0) {
- this.supervision.checkAllRics();
- }
- String name = "policy:" + count + ":" + i;
- putPolicy(name);
- deletePolicy(name);
- }
- }
-
- private void putPolicy(String name) {
- String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric1&service=service1";
- restTemplate.put(putUrl, createJsonHttpEntity("{}"));
- }
-
- private void deletePolicy(String name) {
- String deleteUrl = baseUrl + "/policy?instance=" + name;
- restTemplate.delete(deleteUrl);
- }
- }
-
@Test
public void testConcurrency() throws Exception {
final Instant startTime = Instant.now();
List<Thread> threads = new ArrayList<>();
- addRic("ric1");
- addPolicyType("type1", "ric1");
+ a1ClientFactory.setResponseDelay(Duration.ofMillis(1));
+ addRic("ric");
+ addPolicyType("type1", "ric");
+ addPolicyType("type2", "ric");
for (int i = 0; i < 100; ++i) {
- Thread t = new Thread(new ConcurrencyTestRunnable(baseUrl(), this.supervision), "TestThread_" + i);
+ Thread t =
+ new Thread(new ConcurrencyTestRunnable(baseUrl(), supervision, a1ClientFactory, rics, policyTypes),
+ "TestThread_" + i);
t.start();
threads.add(t);
}
@@ -758,12 +721,6 @@
return ric;
}
- private static HttpEntity<String> createJsonHttpEntity(String content) {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- return new HttpEntity<String>(content, headers);
- }
-
private static <T> List<T> parseList(String jsonString, Class<T> clazz) {
List<T> result = new ArrayList<>();
JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java
new file mode 100644
index 0000000..26e2091
--- /dev/null
+++ b/policy-agent/src/test/java/org/oransc/policyagent/ConcurrencyTestRunnable.java
@@ -0,0 +1,133 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.oransc.policyagent.repository.ImmutablePolicy;
+import org.oransc.policyagent.repository.Policy;
+import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.repository.PolicyTypes;
+import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.tasks.RicSupervision;
+import org.oransc.policyagent.utils.MockA1Client;
+import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * Invoke operations over the NBI and start synchronizations in a separate
+ * thread. For test of robustness using concurrent clients.
+ */
+class ConcurrencyTestRunnable implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(ConcurrencyTestRunnable.class);
+ private final RestTemplate restTemplate = new RestTemplate();
+ private final String baseUrl;
+ static AtomicInteger nextCount = new AtomicInteger(0);
+ private final int count;
+ private final RicSupervision supervision;
+ private final MockA1ClientFactory a1ClientFactory;
+ private final Rics rics;
+ private final PolicyTypes types;
+
+ ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision, MockA1ClientFactory a1ClientFactory, Rics rics,
+ PolicyTypes types) {
+ this.baseUrl = baseUrl;
+ this.count = nextCount.incrementAndGet();
+ this.supervision = supervision;
+ this.a1ClientFactory = a1ClientFactory;
+ this.rics = rics;
+ this.types = types;
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < 100; ++i) {
+ if (i % 10 == 0) {
+ createInconsistency();
+ this.supervision.checkAllRics();
+ }
+ String name = "policy:" + count + ":" + i;
+ putPolicy(name);
+ putPolicy(name + "-");
+ listPolicies();
+ listTypes();
+ deletePolicy(name);
+ deletePolicy(name + "-");
+ }
+ } catch (Exception e) {
+ logger.error("Concurrency exception " + e.toString());
+ }
+ }
+
+ private Policy createPolicyObject(String id) {
+ Ric ric = this.rics.get("ric");
+ PolicyType type = this.types.get("type1");
+ return ImmutablePolicy.builder() //
+ .id(id) //
+ .json("{}") //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName("") //
+ .lastModified("") //
+ .build();
+ }
+
+ private void createInconsistency() {
+ MockA1Client client = a1ClientFactory.getOrCreateA1Client("ric");
+ Policy policy = createPolicyObject("junk");
+ client.putPolicy(policy).block();
+
+ }
+
+ private void listPolicies() {
+ String uri = baseUrl + "/policies";
+ restTemplate.getForObject(uri, String.class);
+ }
+
+ private void listTypes() {
+ String uri = baseUrl + "/policy_types";
+ restTemplate.getForObject(uri, String.class);
+ }
+
+ private void putPolicy(String name) {
+ String putUrl = baseUrl + "/policy?type=type1&instance=" + name + "&ric=ric&service=service1";
+ restTemplate.put(putUrl, createJsonHttpEntity("{}"));
+ }
+
+ private void deletePolicy(String name) {
+ String deleteUrl = baseUrl + "/policy?instance=" + name;
+ restTemplate.delete(deleteUrl);
+ }
+
+ private static HttpEntity<String> createJsonHttpEntity(String content) {
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ return new HttpEntity<String>(content, headers);
+ }
+
+}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
index 481a1fe..2b66a35 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
@@ -131,11 +131,9 @@
private void keepServerAlive() throws InterruptedException {
logger.info("Keeping server alive!");
-
synchronized (this) {
this.wait();
}
-
}
private static String title(String jsonSchema) {
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
index eae00ea..e0ce2ef 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSynchronizationTaskTest.java
@@ -264,9 +264,7 @@
synchronizerUnderTest.run(RIC_1);
verifyCorrectLogMessage(0, logAppender,
- "Synchronization failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
- verifyCorrectLogMessage(1, logAppender,
- "Synchronization failure recovery failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
+ "Recreation of policies failed for ric: " + RIC_1_NAME + ", reason: " + originalErrorMessage);
verify(a1ClientMock, times(2)).deleteAllPolicies();
verifyNoMoreInteractions(a1ClientMock);
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
index 3265d76..0fa5be4 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1Client.java
@@ -46,25 +46,21 @@
@Override
public Mono<List<String>> getPolicyTypeIdentities() {
- synchronized (this.policyTypes) {
- List<String> result = new Vector<>();
- for (PolicyType p : this.policyTypes.getAll()) {
- result.add(p.name());
- }
- return mono(result);
+ List<String> result = new Vector<>();
+ for (PolicyType p : this.policyTypes.getAll()) {
+ result.add(p.name());
}
+ return mono(result);
}
@Override
public Mono<List<String>> getPolicyIdentities() {
- synchronized (this.policies) {
- Vector<String> result = new Vector<>();
- for (Policy policy : policies.getAll()) {
- result.add(policy.id());
- }
-
- return mono(result);
+ Vector<String> result = new Vector<>();
+ for (Policy policy : policies.getAll()) {
+ result.add(policy.id());
}
+
+ return mono(result);
}
@Override
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
index c1fd8c3..9967958 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
@@ -72,4 +72,9 @@
this.asynchDelay = delay;
}
+ public void reset() {
+ this.asynchDelay = Duration.ofSeconds(0);
+ clients.clear();
+ }
+
}