Merge "Added a test for service supervision"
diff --git a/policy-agent/config/application.yaml b/policy-agent/config/application.yaml
index 90b73a4..9fb3fba 100644
--- a/policy-agent/config/application.yaml
+++ b/policy-agent/config/application.yaml
@@ -15,7 +15,7 @@
org.springframework: ERROR
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
- org.oransc.policyagent: WARN
+ org.oransc.policyagent: INFO
file: /var/log/policy-agent/application.log
app:
filepath: /opt/app/policy-agent/config/application_configuration.json
diff --git a/policy-agent/docs/api.yaml b/policy-agent/docs/api.yaml
index 9c7d96e..a2f1b57 100644
--- a/policy-agent/docs/api.yaml
+++ b/policy-agent/docs/api.yaml
@@ -431,11 +431,11 @@
'200':
description: Policy updated
schema:
- type: string
+ type: object
'201':
description: Policy created
schema:
- type: string
+ type: object
'401':
description: Unauthorized
'403':
@@ -869,6 +869,8 @@
title: RicInfo
ServiceRegistrationInfo:
type: object
+ required:
+ - serviceName
properties:
callbackUrl:
type: string
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
index 1af607f..987cd97 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
@@ -33,7 +33,6 @@
import java.util.List;
import org.oransc.policyagent.clients.A1ClientFactory;
-import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.ImmutablePolicy;
import org.oransc.policyagent.repository.Lock.LockType;
@@ -43,6 +42,8 @@
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Service;
+import org.oransc.policyagent.repository.Services;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -58,24 +59,21 @@
@Api(tags = "A1 Policy Management")
public class PolicyController {
- private final Rics rics;
- private final PolicyTypes policyTypes;
- private final Policies policies;
- private final A1ClientFactory a1ClientFactory;
+ @Autowired
+ private Rics rics;
+ @Autowired
+ private PolicyTypes policyTypes;
+ @Autowired
+ private Policies policies;
+ @Autowired
+ private A1ClientFactory a1ClientFactory;
+ @Autowired
+ private Services services;
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
- @Autowired
- PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics,
- A1ClientFactory a1ClientFactory) {
- this.policyTypes = types;
- this.policies = policies;
- this.rics = rics;
- this.a1ClientFactory = a1ClientFactory;
- }
-
@GetMapping("/policy_schemas")
@ApiOperation(value = "Returns policy type schema definitions")
@ApiResponses(
@@ -165,8 +163,13 @@
@ApiResponse(code = 423, message = "RIC is locked", response = String.class)})
public Mono<ResponseEntity<Object>> deletePolicy( //
@RequestParam(name = "instance", required = true) String id) {
- Policy policy = policies.get(id);
- if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) {
+ Policy policy;
+ try {
+ policy = policies.getPolicy(id);
+ keepServiceAlive(policy.ownerServiceName());
+ if (policy.ric().getState() != Ric.RicState.IDLE) {
+ return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
+ }
Ric ric = policy.ric();
return ric.getLock().lock(LockType.SHARED) // //
.flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) //
@@ -175,9 +178,7 @@
.doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
.doOnError(notUsed -> ric.getLock().unlockBlocking()) //
.flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)));
- } else if (policy != null) {
- return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
- } else {
+ } catch (ServiceException e) {
return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}
@@ -186,8 +187,8 @@
@ApiOperation(value = "Put a policy", response = String.class)
@ApiResponses(
value = { //
- @ApiResponse(code = 201, message = "Policy created"), //
- @ApiResponse(code = 200, message = "Policy updated"), //
+ @ApiResponse(code = 201, message = "Policy created", response = Object.class), //
+ @ApiResponse(code = 200, message = "Policy updated", response = Object.class), //
@ApiResponse(code = 423, message = "RIC is locked", response = String.class), //
@ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class), //
@ApiResponse(code = 405, message = "Change is not allowed", response = String.class)})
@@ -201,6 +202,7 @@
String jsonString = gson.toJson(jsonBody);
Ric ric = rics.get(ricName);
PolicyType type = policyTypes.get(typeName);
+ keepServiceAlive(service);
if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) {
Policy policy = ImmutablePolicy.builder() //
.id(instanceId) //
@@ -301,6 +303,13 @@
}
}
+ private void keepServiceAlive(String name) {
+ Service s = this.services.get(name);
+ if (s != null) {
+ s.keepAlive();
+ }
+ }
+
private boolean include(String filter, String value) {
return filter == null || value.equals(filter);
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
index 35348c7..3d36228 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
@@ -57,7 +57,6 @@
private final Policies policies;
private static Gson gson = new GsonBuilder() //
- .serializeNulls() //
.create(); //
@Autowired
@@ -97,6 +96,12 @@
s.getCallbackUrl());
}
+ private void validateRegistrationInfo(ServiceRegistrationInfo registrationInfo) throws ServiceException {
+ if (registrationInfo.serviceName.isEmpty()) {
+ throw new ServiceException("Missing mandatory parameter 'serviceName'");
+ }
+ }
+
@ApiOperation(value = "Register a service")
@ApiResponses(
value = { //
@@ -106,6 +111,7 @@
public ResponseEntity<String> putService(//
@RequestBody ServiceRegistrationInfo registrationInfo) {
try {
+ validateRegistrationInfo(registrationInfo);
this.services.put(toService(registrationInfo));
return new ResponseEntity<>("OK", HttpStatus.OK);
} catch (Exception e) {
@@ -132,18 +138,18 @@
}
}
- @ApiOperation(value = "Keep the policies alive for a service")
+ @ApiOperation(value = "Heartbeat from a serice")
@ApiResponses(
value = { //
- @ApiResponse(code = 200, message = "Policies timeout supervision refreshed"),
+ @ApiResponse(code = 200, message = "Service supervision timer refreshed, OK"),
@ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
@PostMapping("/services/keepalive")
public ResponseEntity<String> keepAliveService(//
@RequestParam(name = "name", required = true) String serviceName) {
try {
- services.getService(serviceName).ping();
+ services.getService(serviceName).keepAlive();
return new ResponseEntity<>("OK", HttpStatus.OK);
- } catch (Exception e) {
+ } catch (ServiceException e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
}
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java
index 907fa1c..e532a36 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java
@@ -20,6 +20,8 @@
package org.oransc.policyagent.controllers;
+import com.google.gson.annotations.SerializedName;
+
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -29,16 +31,23 @@
@ApiModel(value = "ServiceRegistrationInfo")
public class ServiceRegistrationInfo {
- @ApiModelProperty(value = "identity of the service")
- public String serviceName;
+ @ApiModelProperty(value = "identity of the service", required = true, allowEmptyValue = false)
+ @SerializedName(value = "serviceName", alternate = {"name"})
+
+ public String serviceName = "";
@ApiModelProperty(
- value = "keep alive interval for policies owned by the service. 0 means no timeout supervision."
- + " Polcies that are not refreshed within this time are removed")
- public long keepAliveIntervalSeconds;
+ value = "keep alive interval for the service. This is a heartbeat supervision of the service, "
+ + "which in regular intevals must invoke a 'keepAlive' REST call. "
+ + "When a service does not invoke this call within the given time, it is considered unavailble. "
+ + "An unavailable service will be automatically deregistered and its policies will be deleted. "
+ + "Value 0 means no timeout supervision.")
+ @SerializedName("keepAliveIntervalSeconds")
+ public long keepAliveIntervalSeconds = 0;
- @ApiModelProperty(value = "callback for notifying of RIC recovery")
- public String callbackUrl;
+ @ApiModelProperty(value = "callback for notifying of RIC recovery", required = false, allowEmptyValue = true)
+ @SerializedName("callbackUrl")
+ public String callbackUrl = "";
public ServiceRegistrationInfo() {
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java
index 1a9cf32..8f4daac 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java
@@ -36,7 +36,7 @@
public final long keepAliveIntervalSeconds;
@ApiModelProperty(value = "time since last invocation by the service")
- public final long timeSincePingSeconds;
+ public final long timeSinceLastActivitySeconds;
@ApiModelProperty(value = "callback for notifying of RIC recovery")
public String callbackUrl;
@@ -44,7 +44,7 @@
ServiceStatus(String name, long keepAliveIntervalSeconds, long timeSincePingSeconds, String callbackUrl) {
this.serviceName = name;
this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
- this.timeSincePingSeconds = timeSincePingSeconds;
+ this.timeSinceLastActivitySeconds = timeSincePingSeconds;
this.callbackUrl = callbackUrl;
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java
index f0863a5..7b2c9bd 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java
@@ -36,14 +36,14 @@
this.name = name;
this.keepAliveInterval = keepAliveInterval;
this.callbackUrl = callbackUrl;
- ping();
+ keepAlive();
}
public synchronized Duration getKeepAliveInterval() {
return this.keepAliveInterval;
}
- public synchronized void ping() {
+ public synchronized void keepAlive() {
this.lastPing = Instant.now();
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
index 568f002..f6c55dc 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
@@ -20,6 +20,7 @@
package org.oransc.policyagent.repository;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -50,7 +51,7 @@
}
public synchronized Iterable<Service> getAll() {
- return registeredServices.values();
+ return Collections.unmodifiableCollection(registeredServices.values());
}
public synchronized void remove(String name) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
similarity index 91%
rename from policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java
rename to policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
index 22905f6..d6013de 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java
@@ -47,8 +47,8 @@
*/
@Component
@EnableScheduling
-public class RepositorySupervision {
- private static final Logger logger = LoggerFactory.getLogger(RepositorySupervision.class);
+public class RicSupervision {
+ private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
private final Rics rics;
private final Policies policies;
@@ -57,7 +57,7 @@
private final Services services;
@Autowired
- public RepositorySupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
+ public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services) {
this.rics = rics;
this.policies = policies;
@@ -72,7 +72,9 @@
@Scheduled(fixedRate = 1000 * 60)
public void checkAllRics() {
logger.debug("Checking Rics starting");
- createTask().subscribe(this::onRicChecked, null, this::onComplete);
+ createTask().subscribe(ric -> logger.debug("Ric: {} checked", ric.ric.name()), //
+ null, //
+ () -> logger.debug("Checking Rics completed"));
}
private Flux<RicData> createTask() {
@@ -163,15 +165,6 @@
return Mono.error(new Exception("Syncronization started"));
}
- @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
- private void onRicChecked(RicData ric) {
- logger.debug("Ric: {} checked", ric.ric.name());
- }
-
- private void onComplete() {
- logger.debug("Checking Rics completed");
- }
-
RicSynchronizationTask createSynchronizationTask() {
return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
index 9f88d5c..0a0ab82 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
@@ -103,7 +103,7 @@
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
private void onSynchronizationComplete(Ric ric) {
- logger.debug("Synchronization completed for: {}", ric.name());
+ logger.info("Synchronization completed for: {}", ric.name());
ric.setState(RicState.IDLE);
notifyAllServices("Synchronization completed for:" + ric.name());
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
index 8d7062b..4be26eb 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
@@ -20,7 +20,11 @@
package org.oransc.policyagent.tasks;
+import java.time.Duration;
+
import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.Service;
@@ -29,16 +33,17 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive,
- * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is
- * removed from the repository. This means that the service needs to register again after this.
+ * Periodically checks that services with a keepAliveInterval set are alive. If
+ * a service is deemed not alive, all the service's policies are deleted, both
+ * in the repository and in the affected Rics, and the service is removed from
+ * the repository. This means that the service needs to register again after
+ * this.
*/
@Component
@EnableScheduling
@@ -47,39 +52,55 @@
private final Services services;
private final Policies policies;
private A1ClientFactory a1ClientFactory;
+ private final Duration checkInterval;
@Autowired
public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
+ this(services, policies, a1ClientFactory, Duration.ofMinutes(1));
+ }
+
+ public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory,
+ Duration checkInterval) {
this.services = services;
this.policies = policies;
this.a1ClientFactory = a1ClientFactory;
+ this.checkInterval = checkInterval;
+ start();
}
- @Scheduled(fixedRate = 1000 * 60)
- public void checkAllServices() {
+ private void start() {
logger.debug("Checking services starting");
- createTask().subscribe(this::onPolicyDeleted, null, this::onComplete);
+ createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated"));
+ }
+
+ private Flux<?> createTask() {
+ return Flux.interval(this.checkInterval) //
+ .flatMap(notUsed -> checkAllServices());
+ }
+
+ Flux<Policy> checkAllServices() {
+ return Flux.fromIterable(services.getAll()) //
+ .filter(Service::isExpired) //
+ .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+ .doOnNext(service -> services.remove(service.getName())) //
+ .flatMap(this::getAllPoliciesForService) //
+ .flatMap(this::deletePolicy);
}
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
- private void onPolicyDeleted(Policy policy) {
- logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName());
- }
-
- private void onComplete() {
- logger.debug("Checking services completed");
- }
-
- private Flux<Policy> createTask() {
- synchronized (services) {
- return Flux.fromIterable(services.getAll()) //
- .filter(Service::isExpired) //
- .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
- .doOnNext(service -> services.remove(service.getName())) //
- .flatMap(this::getAllPoliciesForService) //
- .doOnNext(policies::remove) //
- .flatMap(this::deletePolicyInRic);
- }
+ private Flux<Policy> deletePolicy(Policy policy) {
+ Lock lock = policy.ric().getLock();
+ return lock.lock(LockType.SHARED) //
+ .doOnNext(notUsed -> policies.remove(policy)) //
+ .flatMap(notUsed -> deletePolicyInRic(policy))
+ .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(),
+ policy.ownerServiceName())) //
+ .doOnNext(notUsed -> lock.unlockBlocking()) //
+ .doOnError(throwable -> lock.unlockBlocking()) //
+ .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(),
+ throwable.getMessage())) //
+ .flatMapMany(notUsed -> Flux.just(policy)) //
+ .onErrorResume(throwable -> Flux.empty());
}
private Flux<Policy> getAllPoliciesForService(Service service) {
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
index 377ca79..a5bf3cb 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
@@ -59,9 +59,12 @@
import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Services;
-import org.oransc.policyagent.tasks.RepositorySupervision;
+import org.oransc.policyagent.tasks.RicSupervision;
+import org.oransc.policyagent.tasks.ServiceSupervision;
import org.oransc.policyagent.utils.MockA1Client;
import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -84,6 +87,8 @@
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class ApplicationTest {
+ private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+
@Autowired
ApplicationContext context;
@@ -100,7 +105,7 @@
MockA1ClientFactory a1ClientFactory;
@Autowired
- RepositorySupervision supervision;
+ RicSupervision supervision;
@Autowired
Services services;
@@ -122,6 +127,9 @@
@TestConfiguration
static class TestBeanFactory {
private final PolicyTypes policyTypes = new PolicyTypes();
+ private final Services services = new Services();
+ private final Policies policies = new Policies();
+ MockA1ClientFactory a1ClientFactory = null;
@Bean
public ApplicationConfig getApplicationConfig() {
@@ -130,13 +138,32 @@
@Bean
MockA1ClientFactory getA1ClientFactory() {
- return new MockA1ClientFactory(this.policyTypes);
+ if (a1ClientFactory == null) {
+ this.a1ClientFactory = new MockA1ClientFactory(this.policyTypes);
+ }
+ return this.a1ClientFactory;
}
@Bean
public PolicyTypes getPolicyTypes() {
return this.policyTypes;
}
+
+ @Bean
+ Policies getPolicies() {
+ return this.policies;
+ }
+
+ @Bean
+ Services getServices() {
+ return this.services;
+ }
+
+ @Bean
+ public ServiceSupervision getServiceSupervision() {
+ Duration checkInterval = Duration.ofMillis(1);
+ return new ServiceSupervision(this.services, this.policies, this.getA1ClientFactory(), checkInterval);
+ }
}
@LocalServerPort
@@ -327,7 +354,7 @@
String url = "/policy_schema?id=type1";
String rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
assertThat(rsp).contains("type1");
assertThat(rsp).contains("title");
@@ -361,7 +388,7 @@
String url = "/policies";
String rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
List<PolicyInfo> info = parseList(rsp, PolicyInfo.class);
assertThat(info).size().isEqualTo(1);
PolicyInfo policyInfo = info.get(0);
@@ -379,14 +406,14 @@
String url = "/policies?type=type1";
String rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
assertThat(rsp).contains("id1");
assertThat(rsp).contains("id2");
assertThat(rsp.contains("id3")).isFalse();
url = "/policies?type=type1&service=service2";
rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
assertThat(rsp.contains("id1")).isFalse();
assertThat(rsp).contains("id2");
assertThat(rsp.contains("id3")).isFalse();
@@ -403,7 +430,7 @@
@Test
public void testPutAndGetService() throws Exception {
// PUT
- putService("name");
+ putService("name", 0);
// GET one service
String url = "/services?name=name";
@@ -411,14 +438,14 @@
List<ServiceStatus> info = parseList(rsp, ServiceStatus.class);
assertThat(info.size()).isEqualTo(1);
ServiceStatus status = info.iterator().next();
- assertThat(status.keepAliveIntervalSeconds).isEqualTo(1);
+ assertThat(status.keepAliveIntervalSeconds).isEqualTo(0);
assertThat(status.serviceName).isEqualTo("name");
// GET (all)
url = "/services";
rsp = restClient().get(url).block();
assertThat(rsp.contains("name")).isTrue();
- System.out.println(rsp);
+ logger.info(rsp);
// Keep alive
url = "/services/keepalive?name=name";
@@ -435,13 +462,31 @@
testErrorCode(restClient().post("/services/keepalive?name=name", ""), HttpStatus.NOT_FOUND);
// PUT servive with crap payload
- testErrorCode(restClient().put("/service", "junk"), HttpStatus.BAD_REQUEST);
+ testErrorCode(restClient().put("/service", "crap"), HttpStatus.BAD_REQUEST);
+ testErrorCode(restClient().put("/service", "{}"), HttpStatus.BAD_REQUEST);
// GET non existing servive
testErrorCode(restClient().get("/services?name=XXX"), HttpStatus.NOT_FOUND);
}
@Test
+ public void testServiceSupervision() throws Exception {
+ putService("service1", 1);
+ addPolicyType("type1", "ric1");
+
+ String url = putPolicyUrl("service1", "ric1", "type1", "instance1");
+ final String policyBody = jsonString();
+ restClient().put(url, policyBody).block();
+
+ assertThat(policies.size()).isEqualTo(1);
+ assertThat(services.size()).isEqualTo(1);
+
+ // Timeout after ~1 second
+ await().untilAsserted(() -> assertThat(policies.size()).isEqualTo(0));
+ assertThat(services.size()).isEqualTo(0);
+ }
+
+ @Test
public void testGetPolicyStatus() throws Exception {
addPolicy("id", "typeName", "service1", "ric1");
assertThat(policies.size()).isEqualTo(1);
@@ -471,16 +516,21 @@
return addPolicy(id, typeName, service, "ric");
}
- private String createServiceJson(String name) {
- ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, 1, "callbackUrl");
+ private String createServiceJson(String name, long keepAliveIntervalSeconds) {
+ ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, keepAliveIntervalSeconds, "callbackUrl");
String json = gson.toJson(service);
return json;
}
private void putService(String name) {
+ putService(name, 0);
+ }
+
+ private void putService(String name, long keepAliveIntervalSeconds) {
String url = "/service";
- restClient().put(url, createServiceJson(name)).block();
+ String body = createServiceJson(name, keepAliveIntervalSeconds);
+ restClient().put(url, body).block();
}
private String baseUrl() {
@@ -496,9 +546,9 @@
private final String baseUrl;
static AtomicInteger nextCount = new AtomicInteger(0);
private final int count;
- private final RepositorySupervision supervision;
+ private final RicSupervision supervision;
- ConcurrencyTestRunnable(String baseUrl, RepositorySupervision supervision) {
+ ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) {
this.baseUrl = baseUrl;
this.count = nextCount.incrementAndGet();
this.supervision = supervision;
@@ -543,7 +593,7 @@
t.join();
}
assertThat(policies.size()).isEqualTo(0);
- System.out.println("Concurrency test took " + Duration.between(startTime, Instant.now()));
+ logger.info("Concurrency test took " + Duration.between(startTime, Instant.now()));
}
private AsyncRestClient restClient() {
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
index b0f1e7f..efbd576 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
@@ -37,6 +37,8 @@
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -48,6 +50,7 @@
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
public class MockPolicyAgent {
+ private static final Logger logger = LoggerFactory.getLogger(MockPolicyAgent.class);
@Autowired
Rics rics;
@@ -117,7 +120,7 @@
PolicyType type = ImmutablePolicyType.builder().name(typeName).schema(schema).build();
policyTypes.put(type);
} catch (Exception e) {
- System.out.println("Could not load json schema " + e);
+ logger.error("Could not load json schema ", e);
}
}
}
@@ -127,13 +130,13 @@
private int port;
private void keepServerAlive() {
- System.out.println("Keeping server alive!");
+ logger.info("Keeping server alive!");
try {
synchronized (this) {
this.wait();
}
} catch (Exception ex) {
- System.out.println("Unexpected: " + ex.toString());
+ logger.error("Unexpected: " + ex);
}
}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
index d034d4d..52147a8 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
@@ -52,6 +52,8 @@
import org.oransc.policyagent.repository.ImmutablePolicyType;
import org.oransc.policyagent.repository.PolicyType;
import org.oransc.policyagent.utils.LoggingUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.WebClientResponseException;
@@ -60,7 +62,7 @@
import reactor.test.StepVerifier;
public class DmaapMessageHandlerTest {
-
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
private static final String URL = "url";
private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
@@ -112,13 +114,13 @@
@Test
public void testMessageParsing() {
String message = dmaapInputMessage(Operation.DELETE);
- System.out.println(message);
+ logger.info(message);
DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
assertTrue(parsedMessage != null);
assertFalse(parsedMessage.payload().isPresent());
message = dmaapInputMessage(Operation.PUT);
- System.out.println(message);
+ logger.info(message);
parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
assertTrue(parsedMessage != null);
assertTrue(parsedMessage.payload().isPresent());
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java b/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java
index 825010a..6fd6c8b 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java
@@ -71,10 +71,8 @@
Lock lock = new Lock();
Mono<Lock> seq = lock.lock(LockType.EXCLUSIVE) //
- .doOnNext(l -> System.out.println("1 " + l)) //
.flatMap(l -> lock.lock(LockType.EXCLUSIVE)) //
- .flatMap(l -> lock.unlock()) //
- .doOnNext(l -> System.out.println("2 " + l)); //
+ .flatMap(l -> lock.unlock());
asynchUnlock(lock);
StepVerifier.create(seq) //
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
similarity index 88%
rename from policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
rename to policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
index d837f78..8d3dd94 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java
@@ -56,7 +56,7 @@
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
-public class RepositorySupervisionTest {
+public class RicSupervisionTest {
private static final String POLICY_TYPE_1_NAME = "type1";
private static final PolicyType POLICY_TYPE_1 = ImmutablePolicyType.builder() //
.name(POLICY_TYPE_1_NAME) //
@@ -133,8 +133,7 @@
setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME)));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
@@ -147,8 +146,7 @@
RIC_1.setState(RicState.UNDEFINED);
rics.put(RIC_1);
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
@@ -165,8 +163,7 @@
RIC_1.setState(RicState.SYNCHRONIZING);
rics.put(RIC_1);
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
@@ -182,8 +179,7 @@
setUpGetPolicyIdentitiesToReturn(new Exception("Failed"));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
@@ -200,8 +196,7 @@
setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
@@ -223,8 +218,7 @@
setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID, "Another_policy")));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
@@ -245,8 +239,7 @@
setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
setUpGetPolicyTypeIdentitiesToReturn(new Exception("Failed"));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
@@ -264,8 +257,7 @@
setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
@@ -292,8 +284,7 @@
setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java
index a984528..070e8da 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java
@@ -101,7 +101,7 @@
await().atMost(Durations.FIVE_SECONDS).with().pollInterval(Durations.ONE_SECOND).until(service::isExpired);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(0);
assertThat(services.size()).isEqualTo(0);
@@ -125,7 +125,7 @@
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ServiceSupervision.class, WARN);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(0);
assertThat(services.size()).isEqualTo(0);
@@ -143,7 +143,7 @@
ServiceSupervision serviceSupervisionUnderTest =
new ServiceSupervision(services, policies, a1ClientFactoryMock);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(1);
assertThat(services.size()).isEqualTo(1);
@@ -159,7 +159,7 @@
ServiceSupervision serviceSupervisionUnderTest =
new ServiceSupervision(services, policies, a1ClientFactoryMock);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(1);
assertThat(services.size()).isEqualTo(1);