NONRTRIC - Using S3 storage for ICS
Change-Id: I7498b18f414d328094ad2cf1da093e74220e4bbb
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-810
diff --git a/config/application.yaml b/config/application.yaml
index fcfc672..ce6b1ef 100644
--- a/config/application.yaml
+++ b/config/application.yaml
@@ -67,3 +67,9 @@
vardata-directory: /var/information-coordinator-service
# If the file name is empty, no authorization token is used
auth-token-file:
+ # S3 object store usage is enabled by defining the bucket to use. This will override the vardata-directory parameter.
+ s3:
+ endpointOverride: http://localhost:9000
+ accessKeyId: minio
+ secretAccessKey: miniostorage
+ bucket:
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 7451773..e89058a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
<jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
<exec.skip>true</exec.skip>
</properties>
- <dependencies>
+ <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
@@ -135,6 +135,16 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <version>2.17.292</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk</artifactId>
+ <version>1.12.321</version>
+ </dependency>
<!-- TEST -->
<dependency>
<groupId>org.springdoc</groupId>
@@ -346,4 +356,4 @@
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/oransc/ics/BeanFactory.java b/src/main/java/org/oransc/ics/BeanFactory.java
index ac6190f..233be90 100644
--- a/src/main/java/org/oransc/ics/BeanFactory.java
+++ b/src/main/java/org/oransc/ics/BeanFactory.java
@@ -29,6 +29,7 @@
import org.oransc.ics.configuration.ApplicationConfig;
import org.oransc.ics.controllers.r1producer.ProducerCallbacks;
import org.oransc.ics.repository.InfoJobs;
+import org.oransc.ics.repository.InfoTypeSubscriptions;
import org.oransc.ics.repository.InfoTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,11 +70,7 @@
public InfoJobs infoJobs(SecurityContext securityContext, InfoTypes types) {
if (infoJobs == null) {
infoJobs = new InfoJobs(getApplicationConfig(), types, producerCallbacks(securityContext));
- try {
- infoJobs.restoreJobsFromDatabase();
- } catch (Exception e) {
- logger.error("Could not restore jobs from database: {}", e.getMessage());
- }
+ infoJobs.restoreJobsFromDatabase().subscribe();
}
return infoJobs;
}
@@ -83,7 +80,7 @@
if (this.infoTypes == null) {
infoTypes = new InfoTypes(getApplicationConfig());
try {
- infoTypes.restoreTypesFromDatabase();
+ infoTypes.restoreTypesFromDatabase().blockLast();
} catch (Exception e) {
logger.error("Could not restore Information Types from database: {}", e.getMessage());
}
@@ -92,6 +89,13 @@
}
@Bean
+ public InfoTypeSubscriptions infoTypeSubscriptions() {
+ InfoTypeSubscriptions s = new InfoTypeSubscriptions(getApplicationConfig());
+ s.restoreFromDatabase().subscribe();
+ return s;
+ }
+
+ @Bean
public ProducerCallbacks producerCallbacks(SecurityContext securityContext) {
if (this.producerCallbacks == null) {
producerCallbacks = new ProducerCallbacks(getApplicationConfig(), securityContext);
diff --git a/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java b/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java
index 50c6daa..6f48cd3 100644
--- a/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java
+++ b/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java
@@ -66,6 +66,22 @@
@Value("${app.webclient.http.proxy-port:0}")
private int httpProxyPort = 0;
+ @Getter
+ @Value("${app.s3.endpointOverride:}")
+ private String s3EndpointOverride;
+
+ @Getter
+ @Value("${app.s3.accessKeyId:}")
+ private String s3AccessKeyId;
+
+ @Getter
+ @Value("${app.s3.secretAccessKey:}")
+ private String s3SecretAccessKey;
+
+ @Getter
+ @Value("${app.s3.bucket:}")
+ private String s3Bucket;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
@@ -93,4 +109,8 @@
return this.webClientConfig;
}
+ public boolean isS3Enabled() {
+ return !(s3EndpointOverride.isBlank() || s3Bucket.isBlank());
+ }
+
}
diff --git a/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java b/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java
index 15394ab..82b4c37 100644
--- a/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java
+++ b/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java
@@ -73,9 +73,9 @@
private Mono<String> noifyStatusToJobOwner(InfoJob job, InfoProducers eiProducers) {
boolean isJobEnabled = eiProducers.isJobEnabled(job);
- A1eEiJobStatus status = isJobEnabled ? new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.ENABLED)
+ A1eEiJobStatus jobStatus = isJobEnabled ? new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.ENABLED)
: new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.DISABLED);
- String body = gson.toJson(status);
+ String body = gson.toJson(jobStatus);
return this.restClient.post(job.getJobStatusUrl(), body) //
.doOnNext(response -> logger.debug("Consumer notified OK {}", job.getId())) //
.doOnNext(response -> job.setLastReportedStatus(isJobEnabled)) //
diff --git a/src/main/java/org/oransc/ics/datastore/DataStore.java b/src/main/java/org/oransc/ics/datastore/DataStore.java
new file mode 100644
index 0000000..78d5d13
--- /dev/null
+++ b/src/main/java/org/oransc/ics/datastore/DataStore.java
@@ -0,0 +1,48 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.ics.datastore;
+
+import org.oransc.ics.configuration.ApplicationConfig;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+
+ public Flux<String> listObjects(String prefix);
+
+ public Mono<byte[]> readObject(String fileName);
+
+ public Mono<byte[]> writeObject(String fileName, byte[] fileData);
+
+ public Mono<Boolean> deleteObject(String name);
+
+ public Mono<String> createDataStore();
+
+ public Mono<String> deleteAllData();
+
+ public Mono<String> deleteBucket();
+
+ static DataStore create(ApplicationConfig config, String location) {
+ return config.isS3Enabled() ? new S3ObjectStore(config, location) : new FileStore(config, location);
+ }
+
+}
diff --git a/src/main/java/org/oransc/ics/datastore/FileStore.java b/src/main/java/org/oransc/ics/datastore/FileStore.java
new file mode 100644
index 0000000..bd9c91c
--- /dev/null
+++ b/src/main/java/org/oransc/ics/datastore/FileStore.java
@@ -0,0 +1,158 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.ics.datastore;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.oransc.ics.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class FileStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ ApplicationConfig applicationConfig;
+ private final String location;
+
+ public FileStore(ApplicationConfig applicationConfig, String location) {
+ this.applicationConfig = applicationConfig;
+ this.location = location;
+ }
+
+ @Override
+ public Flux<String> listObjects(String prefix) {
+ Path root = Path.of(path().toString(), prefix);
+ if (!root.toFile().exists()) {
+ root = root.getParent();
+ }
+
+ logger.debug("Listing files in: {}", root);
+
+ List<String> result = new ArrayList<>();
+ try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+ stream.forEach(path -> filterListFiles(path, prefix, result));
+
+ return Flux.fromIterable(result);
+ } catch (Exception e) {
+ return Flux.error(e);
+ }
+ }
+
+ private void filterListFiles(Path path, String prefix, List<String> result) {
+ if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+ result.add(externalName(path));
+ } else {
+ logger.debug("Ignoring file/directory {}, prefix: {}", path, prefix);
+ }
+ }
+
+ private String externalName(Path path) {
+ String fullName = path.toString();
+ String externalName = fullName.substring(path().toString().length());
+ if (externalName.startsWith("/")) {
+ externalName = externalName.substring(1);
+ }
+ return externalName;
+ }
+
+ @Override
+ public Mono<byte[]> readObject(String fileName) {
+ try {
+ byte[] contents = Files.readAllBytes(path(fileName));
+ return Mono.just(contents);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(String name) {
+ try {
+ Files.delete(path(name));
+ return Mono.just(true);
+ } catch (Exception e) {
+ return Mono.just(false);
+ }
+ }
+
+ @Override
+ public Mono<String> createDataStore() {
+ try {
+ Files.createDirectories(path());
+ } catch (IOException e) {
+ logger.error("Could not create directory: {}, reason: {}", path(), e.getMessage());
+ }
+ return Mono.just("OK");
+ }
+
+ private Path path(String name) {
+ return Path.of(path().toString(), name);
+ }
+
+ private Path path() {
+ return Path.of(applicationConfig.getVardataDirectory(), "database", this.location);
+ }
+
+ @Override
+ public Mono<String> deleteAllData() {
+ return listObjects("") //
+ .flatMap(this::deleteObject) //
+ .collectList() //
+ .map(o -> "OK");
+ }
+
+ @Override
+ public Mono<byte[]> writeObject(String fileName, byte[] fileData) {
+ try {
+ File outputFile = path(fileName).toFile();
+ try (FileOutputStream outputStream = new FileOutputStream(outputFile)) {
+ outputStream.write(fileData);
+ }
+ } catch (IOException e) {
+ logger.debug("Could not write file: {}, reason; {}", path(fileName), e.getMessage());
+ }
+ return Mono.just(fileData);
+ }
+
+ @Override
+ public Mono<String> deleteBucket() {
+ try {
+ FileSystemUtils.deleteRecursively(path(""));
+ } catch (IOException e) {
+ logger.warn("Could not delete: {}, reason: {}", path(""), e.getMessage());
+ }
+ return Mono.just(path("").toString());
+ }
+
+}
diff --git a/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java
new file mode 100644
index 0000000..3434b67
--- /dev/null
+++ b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java
@@ -0,0 +1,232 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.ics.datastore;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+
+import org.oransc.ics.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+class S3ObjectStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+ private final ApplicationConfig applicationConfig;
+
+ private static S3AsyncClient s3AsynchClient;
+ private final String location;
+
+ public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
+ this.applicationConfig = applicationConfig;
+ this.location = location;
+
+ getS3AsynchClient(applicationConfig);
+ }
+
+ private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
+ if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+ s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+ }
+ return s3AsynchClient;
+ }
+
+ private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
+ URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+ return S3AsyncClient.builder() //
+ .region(Region.US_EAST_1) //
+ .endpointOverride(uri) //
+ .credentialsProvider(StaticCredentialsProvider.create( //
+ AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+ applicationConfig.getS3SecretAccessKey())));
+ }
+
+ @Override
+ public Flux<String> listObjects(String prefix) {
+ return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
+ .map(this::externalName);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(String name) {
+ DeleteObjectRequest request = DeleteObjectRequest.builder() //
+ .bucket(bucket()) //
+ .key(key(name)) //
+ .build();
+
+ CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+ return Mono.fromFuture(future).map(resp -> true);
+ }
+
+ @Override
+ public Mono<byte[]> readObject(String fileName) {
+ return getDataFromS3Object(bucket(), fileName);
+ }
+
+ @Override
+ public Mono<byte[]> writeObject(String fileName, byte[] fileData) {
+
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(bucket()) //
+ .key(key(fileName)) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(putObjectResponse -> fileData) //
+ .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+ }
+
+ @Override
+ public Mono<String> createDataStore() {
+ return createS3Bucket(bucket());
+ }
+
+ private Mono<String> createS3Bucket(String s3Bucket) {
+
+ CreateBucketRequest request = CreateBucketRequest.builder() //
+ .bucket(s3Bucket) //
+ .build();
+
+ CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Bucket) //
+ .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+ .onErrorResume(t -> Mono.just(s3Bucket));
+ }
+
+ @Override
+ public Mono<String> deleteAllData() {
+ return listObjects("") //
+ .flatMap(key -> deleteObject(key)) //
+ .collectList() //
+ .map(resp -> "OK")
+ .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
+ .onErrorResume(t -> Mono.just("NOK"));
+ }
+
+ @Override
+ public Mono<String> deleteBucket() {
+ return deleteBucketFromS3Storage()
+ .doOnError(t -> logger.warn("Could not delete: {}, reason: {}", bucket(), t.getMessage()))
+ .map(x -> bucket()).onErrorResume(t -> Mono.just(bucket()));
+ }
+
+ private Mono<DeleteBucketResponse> deleteBucketFromS3Storage() {
+ DeleteBucketRequest request = DeleteBucketRequest.builder() //
+ .bucket(bucket()) //
+ .build();
+
+ CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+ return Mono.fromFuture(future);
+ }
+
+ private String bucket() {
+ return applicationConfig.getS3Bucket();
+ }
+
+ private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
+ ListObjectsResponse prevResponse) {
+ ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
+ .bucket(bucket) //
+ .maxKeys(1000) //
+ .prefix(prefix);
+
+ if (prevResponse != null) {
+ if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
+ builder.marker(prevResponse.nextMarker());
+ } else {
+ return Mono.empty();
+ }
+ }
+
+ ListObjectsRequest listObjectsRequest = builder.build();
+ CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+ return Mono.fromFuture(future);
+ }
+
+ private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+
+ return listObjectsRequest(bucket, prefix, null) //
+ .expand(response -> listObjectsRequest(bucket, prefix, response)) //
+ .map(ListObjectsResponse::contents) //
+ .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+ .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+ .flatMap(Flux::fromIterable) //
+ .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
+ }
+
+ private Mono<byte[]> getDataFromS3Object(String bucket, String fileName) {
+
+ GetObjectRequest request = GetObjectRequest.builder() //
+ .bucket(bucket) //
+ .key(key(fileName)) //
+ .build();
+
+ CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+ s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+ return Mono.fromFuture(future) //
+ .map(b -> b.asByteArray()) //
+ .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket,
+ t.getMessage())) //
+ .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) //
+ .onErrorResume(t -> Mono.empty());
+ }
+
+ private String key(String fileName) {
+ return location + "/" + fileName;
+ }
+
+ private String externalName(String internalName) {
+ return internalName.substring(key("").length());
+ }
+
+}
diff --git a/src/main/java/org/oransc/ics/repository/InfoJobs.java b/src/main/java/org/oransc/ics/repository/InfoJobs.java
index 8f8e0e9..8085573 100644
--- a/src/main/java/org/oransc/ics/repository/InfoJobs.java
+++ b/src/main/java/org/oransc/ics/repository/InfoJobs.java
@@ -24,27 +24,22 @@
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapterFactory;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Vector;
import org.oransc.ics.configuration.ApplicationConfig;
import org.oransc.ics.controllers.r1producer.ProducerCallbacks;
+import org.oransc.ics.datastore.DataStore;
import org.oransc.ics.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-import org.springframework.util.FileSystemUtils;
+import reactor.core.publisher.Flux;
/**
* Dynamic representation of all existing Information Jobs.
@@ -62,6 +57,8 @@
private final ProducerCallbacks producerCallbacks;
+ private final DataStore dataStore;
+
public InfoJobs(ApplicationConfig config, InfoTypes infoTypes, ProducerCallbacks producerCallbacks) {
this.config = config;
GsonBuilder gsonBuilder = new GsonBuilder();
@@ -69,40 +66,47 @@
this.gson = gsonBuilder.create();
this.producerCallbacks = producerCallbacks;
this.infoTypes = infoTypes;
+ this.dataStore = DataStore.create(config, "infojobs");
+ this.dataStore.createDataStore().subscribe();
}
- public synchronized void restoreJobsFromDatabase() throws IOException {
- Files.createDirectories(Paths.get(getDatabaseDirectory()));
- File dbDir = new File(getDatabaseDirectory());
+ public synchronized Flux<InfoJob> restoreJobsFromDatabase() {
+ return dataStore.listObjects("") //
+ .flatMap(dataStore::readObject) //
+ .map(this::toPersistentData) //
+ .map(this::toInfoJob) //
+ .filter(Objects::nonNull) //
+ .doOnNext(this::doPut) //
+ .doOnError(t -> logger.error("Could not restore jobs from datastore, reason: {}", t.getMessage()));
+ }
- for (File file : dbDir.listFiles()) {
- String json = Files.readString(file.toPath());
- InfoJob.PersistentData data = gson.fromJson(json, InfoJob.PersistentData.class);
- try {
- InfoJob job = toInfoJob(data);
- this.doPut(job);
- } catch (ServiceException e) {
- logger.warn("Could not restore job:{},reason: {}", data.getId(), e.getMessage());
- }
+ private InfoJob.PersistentData toPersistentData(byte[] bytes) {
+ String json = new String(bytes);
+ return gson.fromJson(json, InfoJob.PersistentData.class);
+ }
+
+ private InfoJob toInfoJob(InfoJob.PersistentData data) {
+ InfoType type;
+ try {
+ type = infoTypes.getType(data.getTypeId());
+ return InfoJob.builder() //
+ .id(data.getId()) //
+ .type(type) //
+ .owner(data.getOwner()) //
+ .jobData(data.getJobData()) //
+ .targetUrl(data.getTargetUrl()) //
+ .jobStatusUrl(data.getJobStatusUrl()) //
+ .lastUpdated(data.getLastUpdated()) //
+ .build();
+ } catch (ServiceException e) {
+ logger.error("Error restoring info job: {}, reason: {}", data.getId(), e.getMessage());
}
- }
-
- private InfoJob toInfoJob(InfoJob.PersistentData data) throws ServiceException {
- InfoType type = infoTypes.getType(data.getTypeId());
- return InfoJob.builder() //
- .id(data.getId()) //
- .type(type) //
- .owner(data.getOwner()) //
- .jobData(data.getJobData()) //
- .targetUrl(data.getTargetUrl()) //
- .jobStatusUrl(data.getJobStatusUrl()) //
- .lastUpdated(data.getLastUpdated()) //
- .build();
+ return null;
}
public synchronized void put(InfoJob job) {
this.doPut(job);
- storeJobInFile(job);
+ storeJob(job);
}
public synchronized Collection<InfoJob> getJobs() {
@@ -146,11 +150,8 @@
jobsByType.remove(job.getType().getId(), job);
jobsByOwner.remove(job.getOwner(), job);
- try {
- Files.delete(getPath(job));
- } catch (IOException e) {
- logger.warn("Could not remove file: {}", e.getMessage());
- }
+ this.dataStore.deleteObject(getPath(job)).subscribe();
+
this.producerCallbacks.stopInfoJob(job, infoProducers);
}
@@ -163,16 +164,8 @@
this.allEiJobs.clear();
this.jobsByType.clear();
jobsByOwner.clear();
- clearDatabase();
- }
- private void clearDatabase() {
- try {
- FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
- Files.createDirectories(Paths.get(getDatabaseDirectory()));
- } catch (IOException e) {
- logger.warn("Could not delete database : {}", e.getMessage());
- }
+ dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block();
}
private void doPut(InfoJob job) {
@@ -181,26 +174,16 @@
jobsByOwner.put(job.getOwner(), job);
}
- private void storeJobInFile(InfoJob job) {
- try {
- try (PrintStream out = new PrintStream(new FileOutputStream(getFile(job)))) {
- out.print(gson.toJson(job.getPersistentData()));
- }
- } catch (Exception e) {
- logger.warn("Could not store job: {} {}", job.getId(), e.getMessage());
- }
+ private void storeJob(InfoJob job) {
+ String json = gson.toJson(job.getPersistentData());
+ byte[] bytes = json.getBytes();
+ this.dataStore.writeObject(this.getPath(job), bytes) //
+ .doOnError(t -> logger.error("Could not store job in datastore, reason: {}", t.getMessage())) //
+ .subscribe();
}
- private File getFile(InfoJob job) {
- return getPath(job).toFile();
- }
-
- private Path getPath(InfoJob job) {
- return Path.of(getDatabaseDirectory(), job.getId());
- }
-
- private String getDatabaseDirectory() {
- return config.getVardataDirectory() + "/database/eijobs";
+ private String getPath(InfoJob job) {
+ return job.getId();
}
}
diff --git a/src/main/java/org/oransc/ics/repository/InfoProducers.java b/src/main/java/org/oransc/ics/repository/InfoProducers.java
index 4ea881f..e8e206a 100644
--- a/src/main/java/org/oransc/ics/repository/InfoProducers.java
+++ b/src/main/java/org/oransc/ics/repository/InfoProducers.java
@@ -135,9 +135,8 @@
}
public synchronized boolean isJobEnabled(InfoJob job) {
- InfoType type;
try {
- type = this.infoTypes.getType(job.getType().getId());
+ InfoType type = this.infoTypes.getType(job.getType().getId());
for (InfoProducer producer : this.getProducersSupportingType(type)) {
if (producer.isJobEnabled(job)) {
diff --git a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java
index 2557f3c..97fa487 100644
--- a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java
+++ b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java
@@ -23,18 +23,12 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Vector;
import java.util.function.Function;
@@ -42,13 +36,13 @@
import lombok.Getter;
import org.oransc.ics.configuration.ApplicationConfig;
+import org.oransc.ics.datastore.DataStore;
import org.oransc.ics.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
-import org.springframework.util.FileSystemUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -66,6 +60,7 @@
private final Gson gson = new GsonBuilder().create();
private final ApplicationConfig config;
private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
+ private final DataStore dataStore;
public interface ConsumerCallbackHandler {
Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
@@ -96,17 +91,12 @@
}
return this.id.equals(o);
}
-
}
public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
this.config = config;
-
- try {
- this.restoreFromDatabase();
- } catch (IOException e) {
- logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
- }
+ this.dataStore = DataStore.create(config, "infotypesubscriptions");
+ this.dataStore.createDataStore().subscribe();
}
public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
@@ -162,12 +152,7 @@
public void remove(SubscriptionInfo subscription) {
allSubscriptions.remove(subscription.getId());
subscriptionsByOwner.remove(subscription.owner, subscription);
-
- try {
- Files.delete(getPath(subscription));
- } catch (Exception e) {
- logger.debug("Could not delete subscription from database: {}", e.getMessage());
- }
+ dataStore.deleteObject(getPath(subscription)).subscribe();
logger.debug("Removed type status subscription {}", subscription.id);
}
@@ -247,34 +232,28 @@
}
private void clearDatabase() {
- try {
- FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
- Files.createDirectories(Paths.get(getDatabaseDirectory()));
- } catch (IOException e) {
- logger.warn("Could not delete database : {}", e.getMessage());
- }
+ this.dataStore.deleteAllData().block();
}
private void storeInFile(SubscriptionInfo subscription) {
- try {
- try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
- String json = gson.toJson(subscription);
- out.print(json);
- }
- } catch (Exception e) {
- logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
- }
+ String json = gson.toJson(subscription);
+ byte[] bytes = json.getBytes();
+ this.dataStore.writeObject(this.getPath(subscription), bytes)
+ .doOnError(t -> logger.error("Could not store infotype subscription, reason: {}", t.getMessage())) //
+ .subscribe();
}
- public synchronized void restoreFromDatabase() throws IOException {
- Files.createDirectories(Paths.get(getDatabaseDirectory()));
- File dbDir = new File(getDatabaseDirectory());
+ public synchronized Flux<SubscriptionInfo> restoreFromDatabase() {
+ return dataStore.listObjects("") //
+ .flatMap(dataStore::readObject) //
+ .map(this::toSubscriptionInfo) //
+ .filter(Objects::nonNull) //
+ .doOnNext(this::doPut);//
+ }
- for (File file : dbDir.listFiles()) {
- String json = Files.readString(file.toPath());
- SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
- doPut(subscription);
- }
+ private SubscriptionInfo toSubscriptionInfo(byte[] bytes) {
+ String json = new String(bytes);
+ return gson.fromJson(json, SubscriptionInfo.class);
}
private void doPut(SubscriptionInfo subscription) {
@@ -282,20 +261,8 @@
subscriptionsByOwner.put(subscription.owner, subscription);
}
- private File getFile(SubscriptionInfo subscription) {
- return getPath(subscription).toFile();
- }
-
- private Path getPath(SubscriptionInfo subscription) {
- return getPath(subscription.getId());
- }
-
- private Path getPath(String subscriptionId) {
- return Path.of(getDatabaseDirectory(), subscriptionId);
- }
-
- private String getDatabaseDirectory() {
- return config.getVardataDirectory() + "/database/infotypesubscriptions";
+ private String getPath(SubscriptionInfo subscription) {
+ return subscription.getId();
}
}
diff --git a/src/main/java/org/oransc/ics/repository/InfoTypes.java b/src/main/java/org/oransc/ics/repository/InfoTypes.java
index 110bca2..98188ff 100644
--- a/src/main/java/org/oransc/ics/repository/InfoTypes.java
+++ b/src/main/java/org/oransc/ics/repository/InfoTypes.java
@@ -24,26 +24,21 @@
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapterFactory;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Vector;
import org.oransc.ics.configuration.ApplicationConfig;
+import org.oransc.ics.datastore.DataStore;
import org.oransc.ics.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-import org.springframework.util.FileSystemUtils;
+import reactor.core.publisher.Flux;
/**
* Dynamic representation of all Information Types in the system.
@@ -54,24 +49,30 @@
private final Map<String, InfoType> allInfoTypes = new HashMap<>();
private final ApplicationConfig config;
private final Gson gson;
+ private final DataStore dataStore;
public InfoTypes(ApplicationConfig config) {
this.config = config;
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
this.gson = gsonBuilder.create();
+
+ this.dataStore = DataStore.create(config, "infotypes");
+ this.dataStore.createDataStore().subscribe();
}
- public synchronized void restoreTypesFromDatabase() throws IOException {
- Files.createDirectories(Paths.get(getDatabaseDirectory()));
- File dbDir = new File(getDatabaseDirectory());
+ public synchronized Flux<InfoType> restoreTypesFromDatabase() {
+ return dataStore.listObjects("") //
+ .flatMap(dataStore::readObject) //
+ .map(this::toInfoType) //
+ .filter(Objects::nonNull) //
+ .doOnNext(type -> allInfoTypes.put(type.getId(), type)) //
+ .doOnError(t -> logger.error("Could not restore types from datastore, reason: {}", t.getMessage()));
+ }
- for (File file : dbDir.listFiles()) {
- String json = Files.readString(file.toPath());
- InfoType.PersistentInfo storedData = gson.fromJson(json, InfoType.PersistentInfo.class);
- InfoType type = new InfoType(storedData);
- allInfoTypes.put(type.getId(), type);
- }
+ private InfoType toInfoType(byte[] bytes) {
+ String json = new String(bytes);
+ return gson.fromJson(json, InfoType.class);
}
public synchronized void put(InfoType type) {
@@ -97,11 +98,7 @@
public synchronized void remove(InfoType type) {
allInfoTypes.remove(type.getId());
- try {
- Files.delete(getPath(type));
- } catch (IOException e) {
- logger.warn("Could not remove file: {} {}", type.getId(), e.getMessage());
- }
+ dataStore.deleteObject(getPath(type)).block();
}
public synchronized int size() {
@@ -110,7 +107,7 @@
public synchronized void clear() {
this.allInfoTypes.clear();
- clearDatabase();
+ dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block();
}
public synchronized InfoType getCompatibleType(String typeId) throws ServiceException {
@@ -127,38 +124,15 @@
return compatibleTypes.iterator().next();
}
- private void clearDatabase() {
- try {
- FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
- Files.createDirectories(Paths.get(getDatabaseDirectory()));
- } catch (IOException e) {
- logger.warn("Could not delete database : {}", e.getMessage());
- }
- }
-
private void storeInFile(InfoType type) {
- try {
- try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) {
- out.print(gson.toJson(type.getPersistentInfo()));
- }
- } catch (Exception e) {
- logger.warn("Could not save type: {} {}", type.getId(), e.getMessage());
- }
+ String json = gson.toJson(type);
+ byte[] bytes = json.getBytes();
+ this.dataStore.writeObject(this.getPath(type), bytes)
+ .doOnError(t -> logger.error("Could not store infotype in datastore, reason: {}", t.getMessage())) //
+ .subscribe();
}
- private File getFile(InfoType type) {
- return getPath(type).toFile();
- }
-
- private Path getPath(InfoType type) {
- return getPath(type.getId());
- }
-
- private Path getPath(String typeId) {
- return Path.of(getDatabaseDirectory(), typeId);
- }
-
- private String getDatabaseDirectory() {
- return config.getVardataDirectory() + "/database/eitypes";
+ private String getPath(InfoType type) {
+ return type.getId();
}
}
diff --git a/src/test/java/org/oransc/ics/ApplicationTest.java b/src/test/java/org/oransc/ics/ApplicationTest.java
index 77382b7..2f7c1bf 100644
--- a/src/test/java/org/oransc/ics/ApplicationTest.java
+++ b/src/test/java/org/oransc/ics/ApplicationTest.java
@@ -37,6 +37,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import org.json.JSONObject;
@@ -68,6 +69,7 @@
import org.oransc.ics.controllers.r1producer.ProducerJobInfo;
import org.oransc.ics.controllers.r1producer.ProducerRegistrationInfo;
import org.oransc.ics.controllers.r1producer.ProducerStatusInfo;
+import org.oransc.ics.datastore.DataStore;
import org.oransc.ics.exceptions.ServiceException;
import org.oransc.ics.repository.InfoJob;
import org.oransc.ics.repository.InfoJobs;
@@ -103,7 +105,9 @@
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.webclient.trust-store-used=true", //
- "app.vardata-directory=./target"})
+ "app.vardata-directory=/tmp/ics", //
+ "app.s3.bucket=" // If this is set, S3 will be used to store data.
+ })
class ApplicationTest {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -983,18 +987,37 @@
}
@Test
+ void testDb() {
+ DataStore db = DataStore.create(this.applicationConfig, "test");
+ db.createDataStore().block();
+ final int NO_OF_OBJS = 1200;
+ for (int i = 0; i < NO_OF_OBJS; ++i) {
+ String data = "data";
+ db.writeObject("Obj_" + i, data.getBytes()).block();
+ }
+
+ List<?> entries = db.listObjects("").collectList().block();
+ assertThat(entries).hasSize(NO_OF_OBJS);
+
+ db.listObjects("").doOnNext(name -> logger.debug("deleted {}", name)).flatMap(name -> db.deleteObject(name))
+ .blockLast();
+
+ db.createDataStore().block();
+
+ }
+
+ @Test
void testJobDatabasePersistence() throws Exception {
putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
putInfoJob(TYPE_ID, "jobId1");
putInfoJob(TYPE_ID, "jobId2");
assertThat(this.infoJobs.size()).isEqualTo(2);
-
{
InfoJob savedJob = this.infoJobs.getJob("jobId1");
// Restore the jobs
InfoJobs jobs = new InfoJobs(this.applicationConfig, this.infoTypes, this.producerCallbacks);
- jobs.restoreJobsFromDatabase();
+ jobs.restoreJobsFromDatabase().blockLast();
assertThat(jobs.size()).isEqualTo(2);
InfoJob restoredJob = jobs.getJob("jobId1");
assertThat(restoredJob.getPersistentData()).isEqualTo(savedJob.getPersistentData());
@@ -1007,7 +1030,7 @@
{
// Restore the jobs, no jobs in database
InfoJobs jobs = new InfoJobs(this.applicationConfig, this.infoTypes, this.producerCallbacks);
- jobs.restoreJobsFromDatabase();
+ jobs.restoreJobsFromDatabase().blockLast();
assertThat(jobs.size()).isZero();
}
logger.warn("Test removing a job when the db file is gone");
@@ -1028,7 +1051,7 @@
{
// Restore the types
InfoTypes restoredTypes = new InfoTypes(this.applicationConfig);
- restoredTypes.restoreTypesFromDatabase();
+ restoredTypes.restoreTypesFromDatabase().blockLast();
InfoType restoredType = restoredTypes.getType(TYPE_ID);
assertThat(restoredType.getPersistentInfo()).isEqualTo(savedType.getPersistentInfo());
assertThat(restoredTypes.size()).isEqualTo(1);
@@ -1037,7 +1060,7 @@
// Restore the jobs, no jobs in database
InfoTypes restoredTypes = new InfoTypes(this.applicationConfig);
restoredTypes.clear();
- restoredTypes.restoreTypesFromDatabase();
+ restoredTypes.restoreTypesFromDatabase().blockLast();
assertThat(restoredTypes.size()).isZero();
}
logger.warn("Test removing a job when the db file is gone");
@@ -1046,7 +1069,7 @@
}
@Test
- void testConsumerTypeSubscriptionDatabase() {
+ void testConsumerTypeSubscriptionDatabase() throws Exception {
final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
@@ -1055,7 +1078,11 @@
restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+ if (this.applicationConfig.isS3Enabled()) {
+ Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis
+ }
InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+ restoredSubscriptions.restoreFromDatabase().blockLast();
assertThat(restoredSubscriptions.size()).isEqualTo(1);
assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1);