Merge "Added support for EiJob status"
diff --git a/enrichment-coordinator-service/docs/api.json b/enrichment-coordinator-service/docs/api.json
index 71f64db..d5cbad5 100644
--- a/enrichment-coordinator-service/docs/api.json
+++ b/enrichment-coordinator-service/docs/api.json
@@ -43,6 +43,37 @@
}],
"tags": ["A1-EI (enrichment information)"]
}},
+ "/consumer_simulator/eijobs/{eiJobId}/status": {"post": {
+ "summary": "Callback for EI job status",
+ "deprecated": false,
+ "produces": ["application/json"],
+ "operationId": "jobStatusCallbackUsingPOST",
+ "responses": {
+ "200": {"description": "OK"},
+ "201": {"description": "Created"},
+ "401": {"description": "Unauthorized"},
+ "403": {"description": "Forbidden"},
+ "404": {"description": "Not Found"}
+ },
+ "parameters": [
+ {
+ "in": "path",
+ "name": "eiJobId",
+ "description": "eiJobId",
+ "type": "string",
+ "required": true
+ },
+ {
+ "schema": {"$ref": "#/definitions/EiJobStatusObject"},
+ "in": "body",
+ "name": "status",
+ "description": "status",
+ "required": true
+ }
+ ],
+ "tags": ["Consumer Callbacks"],
+ "consumes": ["application/json"]
+ }},
"/ei-producer/v1/eitypes": {"get": {
"summary": "EI type identifiers",
"deprecated": false,
@@ -479,7 +510,7 @@
"consumes": ["application/json"]
}}
},
- "host": "localhost:42127",
+ "host": "localhost:38499",
"definitions": {
"producer_ei_job_request": {
"description": "The body of the EI producer callbacks for EI job creation and deletion",
@@ -698,6 +729,10 @@
"description": "Consumer Controller"
},
{
+ "name": "Consumer Callbacks",
+ "description": "Consumer Simulator Controller"
+ },
+ {
"name": "Enrichment Data Producer API",
"description": "Producer Controller"
},
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java
index ce41956..f4cf9dc 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java
@@ -23,7 +23,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.catalina.connector.Connector;
-import org.oransc.enrichment.clients.ProducerCallbacks;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducers;
@@ -76,11 +75,6 @@
return this.applicationConfig;
}
- @Bean
- public ProducerCallbacks getProducerCallbacks() {
- return new ProducerCallbacks(this.applicationConfig);
- }
-
private static Connector getHttpConnector(int httpPort) {
Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
connector.setScheme("http");
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java
new file mode 100644
index 0000000..cded953
--- /dev/null
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java
@@ -0,0 +1,89 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.enrichment.controllers.consumer;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.lang.invoke.MethodHandles;
+
+import org.oransc.enrichment.clients.AsyncRestClient;
+import org.oransc.enrichment.clients.AsyncRestClientFactory;
+import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
+import org.oransc.enrichment.repository.EiProducer;
+import org.oransc.enrichment.repository.EiType;
+import org.oransc.enrichment.repository.EiTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Callbacks to the EiProducer
+ */
+@Component
+@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
+public class ConsumerCallbacks {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static Gson gson = new GsonBuilder().create();
+
+ private final AsyncRestClient restClient;
+ private final EiTypes eiTypes;
+ private final EiJobs eiJobs;
+
+ @Autowired
+ public ConsumerCallbacks(ApplicationConfig config, EiTypes eiTypes, EiJobs eiJobs) {
+ AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
+ this.restClient = restClientFactory.createRestClient("");
+ this.eiTypes = eiTypes;
+ this.eiJobs = eiJobs;
+ }
+
+ public void notifyConsumersProducerDeleted(EiProducer eiProducer) {
+ for (EiType type : eiProducer.getEiTypes()) {
+ if (this.eiTypes.get(type.getId()) == null) {
+ for (EiJob job : this.eiJobs.getJobsForType(type)) {
+ noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED));
+ }
+ }
+ }
+ }
+
+ public void notifyConsumersTypeAdded(EiType eiType) {
+ for (EiJob job : this.eiJobs.getJobsForType(eiType)) {
+ noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED));
+ }
+ }
+
+ private void noifyJobOwner(EiJob job, ConsumerEiJobStatus status) {
+ if (!job.jobStatusUrl().isEmpty()) {
+ String body = gson.toJson(status);
+ this.restClient.post(job.jobStatusUrl(), body) //
+ .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.id()), //
+ throwable -> logger.warn("Consumer notify failed {} {}", job.jobStatusUrl(), throwable.toString()), //
+ null);
+ }
+ }
+
+}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java
index bead826..b194dc1 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java
@@ -31,18 +31,21 @@
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Vector;
import org.everit.json.schema.Schema;
import org.everit.json.schema.loader.SchemaLoader;
import org.json.JSONObject;
-import org.oransc.enrichment.clients.ProducerCallbacks;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.controllers.ErrorResponse;
import org.oransc.enrichment.controllers.VoidResponse;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
import org.oransc.enrichment.exceptions.ServiceException;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
+import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiType;
import org.oransc.enrichment.repository.EiTypes;
import org.oransc.enrichment.repository.ImmutableEiJob;
@@ -78,8 +81,7 @@
@Autowired
ProducerCallbacks producerCallbacks;
- private static Gson gson = new GsonBuilder() //
- .create(); //
+ private static Gson gson = new GsonBuilder().create();
@GetMapping(path = "/eitypes", produces = MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "EI type identifiers", notes = "")
@@ -149,7 +151,7 @@
List<String> result = new ArrayList<>();
if (owner != null) {
for (EiJob job : this.eiJobs.getJobsForOwner(owner)) {
- if (eiTypeId == null || job.type().getId().equals(eiTypeId)) {
+ if (eiTypeId == null || job.typeId().equals(eiTypeId)) {
result.add(job.id());
}
}
@@ -204,9 +206,21 @@
}
}
+ private Collection<EiProducer> getProducers(EiJob eiJob) {
+ try {
+ return this.eiTypes.getType(eiJob.typeId()).getProducers();
+ } catch (Exception e) {
+ return new Vector<>();
+ }
+ }
+
private ConsumerEiJobStatus toEiJobStatus(EiJob job) {
- // TODO
- return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ for (EiProducer producer : getProducers(job)) {
+ if (producer.isAvailable()) {
+ return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ }
+ }
+ return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
}
@DeleteMapping(path = "/eijobs/{eiJobId}", produces = MediaType.APPLICATION_JSON_VALUE)
@@ -274,7 +288,7 @@
validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData);
EiJob existingEiJob = this.eiJobs.get(eiJobId);
- if (existingEiJob != null && !existingEiJob.type().getId().equals(eiJobInfo.eiTypeId)) {
+ if (existingEiJob != null && !existingEiJob.typeId().equals(eiJobInfo.eiTypeId)) {
throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
}
return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType));
@@ -301,15 +315,14 @@
}
}
- // Status TBD
-
private EiJob toEiJob(ConsumerEiJobInfo info, String id, EiType type) {
return ImmutableEiJob.builder() //
.id(id) //
- .type(type) //
+ .typeId(type.getId()) //
.owner(info.owner) //
.jobData(info.jobData) //
- .targetUri(info.targetUri) //
+ .targetUrl(info.targetUri) //
+ .jobStatusUrl(info.statusNotificationUri == null ? "" : info.statusNotificationUri) //
.build();
}
@@ -318,6 +331,6 @@
}
private ConsumerEiJobInfo toEiJobInfo(EiJob s) {
- return new ConsumerEiJobInfo(s.type().getId(), s.jobData(), s.owner(), s.targetUri());
+ return new ConsumerEiJobInfo(s.typeId(), s.jobData(), s.owner(), s.targetUrl(), s.jobStatusUrl());
}
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobInfo.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobInfo.java
index d6996ce..d88091f 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobInfo.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobInfo.java
@@ -60,10 +60,12 @@
public ConsumerEiJobInfo() {
}
- public ConsumerEiJobInfo(String eiTypeId, Object jobData, String owner, String targetUri) {
+ public ConsumerEiJobInfo(String eiTypeId, Object jobData, String owner, String targetUri,
+ String statusNotificationUri) {
this.eiTypeId = eiTypeId;
this.jobData = jobData;
this.owner = owner;
this.targetUri = targetUri;
+ this.statusNotificationUri = statusNotificationUri;
}
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobStatus.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobStatus.java
index b2f657f..60752ec 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobStatus.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerEiJobStatus.java
@@ -45,7 +45,10 @@
@ApiModelProperty(value = OPERATIONAL_STATE_DESCRIPTION, name = "eiJobStatus", required = true)
@SerializedName("eiJobStatus")
@JsonProperty(value = "eiJobStatus", required = true)
- public final EiJobStatusValues state;
+ public EiJobStatusValues state;
+
+ public ConsumerEiJobStatus() {
+ }
public ConsumerEiJobStatus(EiJobStatusValues state) {
this.state = state;
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java
similarity index 79%
rename from enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java
rename to enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java
index 87d1dba..e00ac74 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerCallbacks.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java
@@ -18,18 +18,25 @@
* ========================LICENSE_END===================================
*/
-package org.oransc.enrichment.clients;
+package org.oransc.enrichment.controllers.producer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Vector;
+import org.oransc.enrichment.clients.AsyncRestClient;
+import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiProducer;
+import org.oransc.enrichment.repository.EiTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -37,6 +44,7 @@
/**
* Callbacks to the EiProducer
*/
+@Component
@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
public class ProducerCallbacks {
@@ -44,16 +52,19 @@
private static Gson gson = new GsonBuilder().create();
private final AsyncRestClient restClient;
+ private final EiTypes eiTypes;
- public ProducerCallbacks(ApplicationConfig config) {
+ @Autowired
+ public ProducerCallbacks(ApplicationConfig config, EiTypes eiTypes) {
AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
this.restClient = restClientFactory.createRestClient("");
+ this.eiTypes = eiTypes;
}
public void notifyProducersJobDeleted(EiJob eiJob) {
ProducerJobInfo request = new ProducerJobInfo(eiJob);
String body = gson.toJson(request);
- for (EiProducer producer : eiJob.type().getProducers()) {
+ for (EiProducer producer : getProducers(eiJob)) {
restClient.post(producer.getJobDeletionCallbackUrl(), body) //
.subscribe(notUsed -> logger.debug("Job deleted OK {}", producer.getId()), //
throwable -> logger.warn("Job delete failed {}", producer.getId(), throwable.toString()), null);
@@ -67,7 +78,7 @@
* @return the number of producers that returned OK
*/
public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
- return Flux.fromIterable(eiJob.type().getProducers()) //
+ return Flux.fromIterable(getProducers(eiJob)) //
.flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) //
.collectList() //
.flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
@@ -92,4 +103,12 @@
});
}
+ private Collection<EiProducer> getProducers(EiJob eiJob) {
+ try {
+ return this.eiTypes.getType(eiJob.typeId()).getProducers();
+ } catch (Exception e) {
+ return new Vector<>();
+ }
+ }
+
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java
index 306e3a9..9bfa2d2 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java
@@ -33,10 +33,9 @@
import java.util.Collection;
import java.util.List;
-import org.oransc.enrichment.clients.ProducerCallbacks;
-import org.oransc.enrichment.clients.ProducerJobInfo;
import org.oransc.enrichment.controllers.ErrorResponse;
import org.oransc.enrichment.controllers.VoidResponse;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo.ProducerEiTypeRegistrationInfo;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
@@ -78,6 +77,9 @@
@Autowired
ProducerCallbacks producerCallbacks;
+ @Autowired
+ ConsumerCallbacks consumerCallbacks;
+
@GetMapping(path = ProducerConsts.API_ROOT + "/eitypes", produces = MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "EI type identifiers", notes = "")
@ApiResponses(
@@ -214,6 +216,7 @@
ProducerStatusInfo.OperationalState opState =
producer.isAvailable() ? ProducerStatusInfo.OperationalState.ENABLED
: ProducerStatusInfo.OperationalState.DISABLED;
+ this.logger.debug("opState {}", opState);
return new ProducerStatusInfo(opState);
}
@@ -251,7 +254,7 @@
private void purgeTypes(Collection<EiType> types) {
for (EiType type : types) {
if (type.getProducerIds().isEmpty()) {
- this.eiTypes.deregisterType(type, this.eiJobs);
+ this.eiTypes.remove(type);
}
}
}
@@ -269,6 +272,7 @@
try {
final EiProducer producer = this.eiProducers.getProducer(eiProducerId);
this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs);
+ this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} catch (Exception e) {
return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
@@ -280,6 +284,7 @@
if (type == null) {
type = new EiType(typeInfo.eiTypeId, typeInfo.jobDataSchema);
this.eiTypes.put(type);
+ this.consumerCallbacks.notifyConsumersTypeAdded(type);
}
return type;
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerJobInfo.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java
similarity index 94%
rename from enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerJobInfo.java
rename to enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java
index ada16e2..a2f5b89 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/ProducerJobInfo.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerJobInfo.java
@@ -18,7 +18,7 @@
* ========================LICENSE_END===================================
*/
-package org.oransc.enrichment.clients;
+package org.oransc.enrichment.controllers.producer;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.annotations.SerializedName;
@@ -63,7 +63,7 @@
}
public ProducerJobInfo(EiJob job) {
- this(job.jobData(), job.id(), job.type().getId(), job.targetUri());
+ this(job.jobData(), job.id(), job.typeId(), job.targetUrl());
}
public ProducerJobInfo() {
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java
index 8c1206f..95bbc03 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJob.java
@@ -32,11 +32,13 @@
String id();
- EiType type();
+ String typeId();
String owner();
Object jobData();
- String targetUri();
+ String targetUrl();
+
+ String jobStatusUrl();
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java
index f0e4051..706c8dd 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java
@@ -38,7 +38,7 @@
public synchronized void put(EiJob job) {
allEiJobs.put(job.id(), job);
- jobsByType.put(job.type().getId(), job.id(), job);
+ jobsByType.put(job.typeId(), job.id(), job);
jobsByOwner.put(job.owner(), job.id(), job);
}
@@ -80,7 +80,7 @@
public synchronized void remove(EiJob job) {
this.allEiJobs.remove(job.id());
- jobsByType.remove(job.type().getId(), job.id());
+ jobsByType.remove(job.typeId(), job.id());
jobsByOwner.remove(job.owner(), job.id());
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java
index b3cd895..801e7fc 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java
@@ -36,11 +36,10 @@
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class EiProducers {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private Map<String, EiProducer> allEiProducers = new HashMap<>();
+ private final Map<String, EiProducer> allEiProducers = new HashMap<>();
public synchronized void put(EiProducer producer) {
allEiProducers.put(producer.getId(), producer);
-
}
public synchronized Collection<EiProducer> getAllProducers() {
@@ -79,7 +78,7 @@
this.logger.error("Bug, no producer found");
}
if (type.getProducerIds().isEmpty()) {
- eiTypes.deregisterType(type, eiJobs);
+ eiTypes.remove(type);
}
}
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java
index 6397c2f..d0bf53a 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java
@@ -73,13 +73,4 @@
public synchronized void clear() {
this.allEiTypes.clear();
}
-
- public void deregisterType(EiType type, EiJobs eiJobs) {
- this.remove(type);
- for (EiJob job : eiJobs.getJobsForType(type.getId())) {
- eiJobs.remove(job);
- this.logger.warn("Deleted job {} because no producers left", job.id());
- }
- }
-
}
diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
index f239a48..e242166 100644
--- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
+++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
@@ -23,6 +23,7 @@
import org.oransc.enrichment.clients.AsyncRestClient;
import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiProducers;
@@ -50,15 +51,17 @@
private final EiJobs eiJobs;
private final EiTypes eiTypes;
private final AsyncRestClient restClient;
+ private final ConsumerCallbacks consumerCallbacks;
@Autowired
public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
- EiTypes eiTypes) {
+ EiTypes eiTypes, ConsumerCallbacks consumerCallbacks) {
AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
this.restClient = restClientFactory.createRestClient("");
this.eiJobs = eiJobs;
this.eiProducers = eiProducers;
this.eiTypes = eiTypes;
+ this.consumerCallbacks = consumerCallbacks;
}
@Scheduled(fixedRate = 1000 * 60 * 5)
@@ -87,6 +90,7 @@
producer.setAliveStatus(false);
if (producer.isDead()) {
this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs);
+ this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
}
}
diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java
index e272faa..e707fb7 100644
--- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java
+++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java
@@ -43,15 +43,17 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.oransc.enrichment.clients.AsyncRestClient;
import org.oransc.enrichment.clients.AsyncRestClientFactory;
-import org.oransc.enrichment.clients.ProducerJobInfo;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.configuration.ImmutableWebClientConfig;
import org.oransc.enrichment.configuration.WebClientConfig;
+import org.oransc.enrichment.controller.ConsumerSimulatorController;
import org.oransc.enrichment.controller.ProducerSimulatorController;
import org.oransc.enrichment.controllers.consumer.ConsumerConsts;
import org.oransc.enrichment.controllers.consumer.ConsumerEiJobInfo;
+import org.oransc.enrichment.controllers.consumer.ConsumerEiJobStatus;
import org.oransc.enrichment.controllers.consumer.ConsumerEiTypeInfo;
import org.oransc.enrichment.controllers.producer.ProducerConsts;
+import org.oransc.enrichment.controllers.producer.ProducerJobInfo;
import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo;
import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo.ProducerEiTypeRegistrationInfo;
import org.oransc.enrichment.controllers.producer.ProducerStatusInfo;
@@ -91,6 +93,7 @@
private final String EI_TYPE_ID = "typeId";
private final String EI_PRODUCER_ID = "producerId";
private final String EI_JOB_PROPERTY = "\"property1\"";
+ private final String EI_JOB_ID = "jobId";
@Autowired
ApplicationContext context;
@@ -111,6 +114,9 @@
ProducerSimulatorController producerSimulator;
@Autowired
+ ConsumerSimulatorController consumerSimulator;
+
+ @Autowired
ProducerSupervision producerSupervision;
private static Gson gson = new GsonBuilder().create();
@@ -135,6 +141,7 @@
this.eiTypes.clear();
this.eiProducers.clear();
this.producerSimulator.getTestResults().reset();
+ this.consumerSimulator.getTestResults().reset();
}
@AfterEach
@@ -236,12 +243,9 @@
void testGetEiJobStatus() throws Exception {
putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
putEiJob(EI_TYPE_ID, "jobId");
- String url = ConsumerConsts.API_ROOT + "/eijobs/jobId/status";
- String rsp = restClient().get(url).block();
- assertThat(rsp).contains("ENABLED");
- }
- // Status TBD
+ verifyJobStatus("jobId", "ENABLED");
+ }
@Test
void testDeleteEiJob() throws Exception {
@@ -306,8 +310,8 @@
String url = ConsumerConsts.API_ROOT + "/eijobs/jobId";
// The element with name "property1" is mandatory in the schema
- ConsumerEiJobInfo jobInfo =
- new ConsumerEiJobInfo("typeId", jsonObject("{ \"XXstring\" : \"value\" }"), "owner", "targetUri");
+ ConsumerEiJobInfo jobInfo = new ConsumerEiJobInfo("typeId", jsonObject("{ \"XXstring\" : \"value\" }"), "owner",
+ "targetUri", "jobStatusUrl");
String body = gson.toJson(jobInfo);
testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Json validation failure");
@@ -349,7 +353,7 @@
putEiJob("typeId1", "jobId");
String url = ConsumerConsts.API_ROOT + "/eijobs/jobId";
- String body = gson.toJson(eiJobInfo("typeId2"));
+ String body = gson.toJson(eiJobInfo("typeId2", "jobId"));
testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT,
"Not allowed to change type for existing EI job");
}
@@ -440,17 +444,32 @@
putEiJob(EI_TYPE_ID, "jobId");
assertThat(this.eiJobs.size()).isEqualTo(1);
- String url = ProducerConsts.API_ROOT + "/eiproducers/eiProducerId";
- restClient().deleteForEntity(url).block();
+ deleteEiProducer("eiProducerId");
assertThat(this.eiProducers.size()).isEqualTo(1);
assertThat(this.eiTypes.getType(EI_TYPE_ID).getProducerIds()).doesNotContain("eiProducerId");
- assertThat(this.eiJobs.size()).isEqualTo(1);
+ verifyJobStatus("jobId", "ENABLED");
- String url2 = ProducerConsts.API_ROOT + "/eiproducers/eiProducerId2";
- restClient().deleteForEntity(url2).block();
+ deleteEiProducer("eiProducerId2");
assertThat(this.eiProducers.size()).isZero();
assertThat(this.eiTypes.size()).isZero();
- assertThat(this.eiJobs.size()).isZero();
+ verifyJobStatus("jobId", "DISABLED");
+ }
+
+ @Test
+ void testJobStatusNotifications() throws JsonMappingException, JsonProcessingException, ServiceException {
+ putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, "jobId");
+
+ deleteEiProducer("eiProducerId");
+ assertThat(this.eiTypes.size()).isZero(); // The type is gone
+ assertThat(this.eiJobs.size()).isEqualTo(1); // The job remains
+ ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+ assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+
+ putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2));
+ assertThat(consumerResults.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
}
@Test
@@ -469,18 +488,17 @@
assertThat(resp.getBody()).contains(EI_PRODUCER_ID);
}
- private void assertProducerOpState(String producerId,
- ProducerStatusInfo.OperationalState expectedOperationalState) {
- String statusUrl = ProducerConsts.API_ROOT + "/eiproducers/" + producerId + "/status";
- ResponseEntity<String> resp = restClient().getForEntity(statusUrl).block();
- ProducerStatusInfo statusInfo = gson.fromJson(resp.getBody(), ProducerStatusInfo.class);
- assertThat(statusInfo.opState).isEqualTo(expectedOperationalState);
- }
-
@Test
void testProducerSupervision() throws JsonMappingException, JsonProcessingException, ServiceException {
putEiProducerWithOneTypeRejecting("simulateProducerError", EI_TYPE_ID);
+ {
+ // Create a job
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, "jobId");
+ deleteEiProducer(EI_PRODUCER_ID);
+ }
+
assertThat(this.eiProducers.size()).isEqualTo(1);
assertThat(this.eiTypes.size()).isEqualTo(1);
assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.ENABLED);
@@ -490,10 +508,15 @@
assertThat(this.eiProducers.size()).isEqualTo(1);
assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
- // After 3 failed checks, the producer shall be deregisterred
+ // After 3 failed checks, the producer and the type shall be deregisterred
this.producerSupervision.createTask().blockLast();
assertThat(this.eiProducers.size()).isEqualTo(0);
assertThat(this.eiTypes.size()).isEqualTo(0);
+
+ // Job disabled status notification shall be received
+ ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
+ assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
}
@Test
@@ -506,6 +529,25 @@
assertThat(resp.getBody()).contains("hunky dory");
}
+ private void deleteEiProducer(String eiProducerId) {
+ String url = ProducerConsts.API_ROOT + "/eiproducers/" + eiProducerId;
+ restClient().deleteForEntity(url).block();
+ }
+
+ private void verifyJobStatus(String jobId, String expStatus) {
+ String url = ConsumerConsts.API_ROOT + "/eijobs/" + jobId + "/status";
+ String rsp = restClient().get(url).block();
+ assertThat(rsp).contains(expStatus);
+ }
+
+ private void assertProducerOpState(String producerId,
+ ProducerStatusInfo.OperationalState expectedOperationalState) {
+ String statusUrl = ProducerConsts.API_ROOT + "/eiproducers/" + producerId + "/status";
+ ResponseEntity<String> resp = restClient().getForEntity(statusUrl).block();
+ ProducerStatusInfo statusInfo = gson.fromJson(resp.getBody(), ProducerStatusInfo.class);
+ assertThat(statusInfo.opState).isEqualTo(expectedOperationalState);
+ }
+
ProducerEiTypeRegistrationInfo producerEiTypeRegistrationInfo(String typeId)
throws JsonMappingException, JsonProcessingException {
return new ProducerEiTypeRegistrationInfo(jsonSchemaObject(), typeId);
@@ -531,15 +573,16 @@
baseUrl() + ProducerSimulatorController.SUPERVISION_URL);
}
- ConsumerEiJobInfo eiJobInfo() throws JsonMappingException, JsonProcessingException {
- return eiJobInfo(EI_TYPE_ID);
+ private ConsumerEiJobInfo eiJobInfo() throws JsonMappingException, JsonProcessingException {
+ return eiJobInfo(EI_TYPE_ID, EI_JOB_ID);
}
- ConsumerEiJobInfo eiJobInfo(String typeId) throws JsonMappingException, JsonProcessingException {
- return new ConsumerEiJobInfo(typeId, jsonObject(), "owner", "targetUri");
+ ConsumerEiJobInfo eiJobInfo(String typeId, String eiJobId) throws JsonMappingException, JsonProcessingException {
+ return new ConsumerEiJobInfo(typeId, jsonObject(), "owner", "targetUri",
+ baseUrl() + ConsumerSimulatorController.getJobStatusUrl(eiJobId));
}
- Object jsonObject(String json) {
+ private Object jsonObject(String json) {
try {
return JsonParser.parseString(json).getAsJsonObject();
} catch (Exception e) {
@@ -547,7 +590,7 @@
}
}
- Object jsonSchemaObject() {
+ private Object jsonSchemaObject() {
// a json schema with one mandatory property named "string"
String schemaStr = "{" //
+ "\"$schema\": \"http://json-schema.org/draft-04/schema#\"," //
@@ -564,7 +607,7 @@
return jsonObject(schemaStr);
}
- Object jsonObject() {
+ private Object jsonObject() {
return jsonObject("{ " + EI_JOB_PROPERTY + " : \"value\" }");
}
@@ -572,7 +615,7 @@
throws JsonMappingException, JsonProcessingException, ServiceException {
String url = ConsumerConsts.API_ROOT + "/eijobs/" + jobId;
- String body = gson.toJson(eiJobInfo(eiTypeId));
+ String body = gson.toJson(eiJobInfo(eiTypeId, jobId));
restClient().putForEntity(url, body).block();
return this.eiJobs.getJob(jobId);
diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ConsumerSimulatorController.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ConsumerSimulatorController.java
new file mode 100644
index 0000000..562f286
--- /dev/null
+++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ConsumerSimulatorController.java
@@ -0,0 +1,83 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2020 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.enrichment.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import lombok.Getter;
+
+import org.oransc.enrichment.controllers.VoidResponse;
+import org.oransc.enrichment.controllers.consumer.ConsumerEiJobStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController("ConsumerSimulatorController")
+@Api(tags = {"Consumer Callbacks"})
+public class ConsumerSimulatorController {
+
+ private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class TestResults {
+
+ public List<ConsumerEiJobStatus> status = Collections.synchronizedList(new ArrayList<ConsumerEiJobStatus>());
+
+ public void reset() {
+ status.clear();
+ }
+ }
+
+ @Getter
+ private TestResults testResults = new TestResults();
+
+ public static String getJobStatusUrl(String eiJobId) {
+ return "/consumer_simulator/eijobs/" + eiJobId + "/status";
+ }
+
+ @PostMapping(path = "/consumer_simulator/eijobs/{eiJobId}/status", produces = MediaType.APPLICATION_JSON_VALUE)
+ @ApiOperation(value = "Callback for EI job status", notes = "")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "OK", response = VoidResponse.class)} //
+ )
+ public ResponseEntity<Object> jobStatusCallback( //
+ @PathVariable("eiJobId") String eiJobId, //
+ @RequestBody ConsumerEiJobStatus status) {
+ logger.info("Job status callback status: {} eiJobId: {}", status.state, eiJobId);
+ this.testResults.status.add(status);
+ return new ResponseEntity<>(HttpStatus.OK);
+ }
+
+}
diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ProducerSimulatorController.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ProducerSimulatorController.java
index 4c57abd..cc61d21 100644
--- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ProducerSimulatorController.java
+++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/controller/ProducerSimulatorController.java
@@ -32,9 +32,9 @@
import lombok.Getter;
-import org.oransc.enrichment.clients.ProducerJobInfo;
import org.oransc.enrichment.controllers.ErrorResponse;
import org.oransc.enrichment.controllers.VoidResponse;
+import org.oransc.enrichment.controllers.producer.ProducerJobInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;