Changed so that registration never gives up
This includes registration of types, registration of producer and creation of the input job.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-853
Change-Id: I3f3cafaa73c23100d6c0c7602be75e3b0f6f45d9
diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java
index 4bd95a1..5b10241 100644
--- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java
+++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java
@@ -82,16 +82,13 @@
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void runSupervisionTask() {
- supervisionTask().subscribe( //
+ if (this.isRegisteredInIcs) {
+ return;
+ }
+ registerTypesAndProducer().subscribe( //
null, //
- this::handleRegistrationFailure, //
- this::handleRegistrationCompleted);
- }
-
- public Mono<String> supervisionTask() {
- return checkRegistration() //
- .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
- .flatMap(isRegisterred -> registerTypesAndProducer());
+ this::handleRegistrationFailure//
+ );
}
private void handleRegistrationCompleted() {
@@ -107,28 +104,11 @@
return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
}
- // Returns TRUE if registration is correct
- private Mono<Boolean> checkRegistration() {
- return restClient.get(producerRegistrationUrl()) //
- .flatMap(this::isRegisterredInfoCorrect) //
- .onErrorResume(t -> Mono.just(Boolean.FALSE));
- }
-
- private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
- ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
- if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered in ICS");
- return Mono.just(Boolean.TRUE);
- } else {
- return Mono.just(Boolean.FALSE);
- }
- }
-
private String registerTypeUrl(InfoType type) {
return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
- private Mono<String> registerTypesAndProducer() {
+ public Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 1;
return Flux.fromIterable(this.types.getAll()) //
@@ -138,7 +118,8 @@
CONCURRENCY) //
.collectList() //
.doOnNext(type -> logger.info("Registering producer")) //
- .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())));
+ .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())))
+ .doOnNext(n -> handleRegistrationCompleted());
}
private Mono<InfoType> createInputDataJob(InfoType type) {
@@ -155,7 +136,6 @@
return restClient.put(consumerJobUrl(JOB_ID), body)
.doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
t.getMessage()))
- .onErrorResume(t -> Mono.just("")) //
.doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
.map(x -> type);
}
diff --git a/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java b/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java
index 8cba2ca..dd38ffc 100644
--- a/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java
+++ b/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java
@@ -257,9 +257,10 @@
}
private void waitForRegistration() {
+ producerRegistrationTask.registerTypesAndProducer().block();
// Register producer, Register types
await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- producerRegistrationTask.supervisionTask().block();
+ producerRegistrationTask.registerTypesAndProducer().block();
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
assertThat(producerRegistrationTask.isRegisteredInIcs()).isTrue();
@@ -430,23 +431,6 @@
}
@Test
- void testReRegister() throws Exception {
- // Wait foir register types and producer
- waitForRegistration();
-
- // Clear the registration, should trigger a re-register
- icsSimulatorController.testResults.reset();
- producerRegistrationTask.supervisionTask().block();
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
- // Just clear the registerred types, should trigger a re-register
- icsSimulatorController.testResults.types.clear();
- await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds)
- .hasSize(this.types.size()));
- }
-
- @Test
@SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
void testZZActuator() throws Exception {
// The test must be run last, hence the "ZZ" in the name. All succeeding tests