NONRTRIC - Using S3 storage for ICS

Change-Id: I7498b18f414d328094ad2cf1da093e74220e4bbb
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-810
diff --git a/config/application.yaml b/config/application.yaml
index fcfc672..ce6b1ef 100644
--- a/config/application.yaml
+++ b/config/application.yaml
@@ -67,3 +67,9 @@
   vardata-directory: /var/information-coordinator-service
   # If the file name is empty, no authorization token is used
   auth-token-file:
+  # S3 object store usage is enabled by defining the bucket to use. This will override the vardata-directory parameter.
+  s3:
+    endpointOverride: http://localhost:9000
+    accessKeyId: minio
+    secretAccessKey: miniostorage
+    bucket:
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 7451773..e89058a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,7 @@
         <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
         <exec.skip>true</exec.skip>
     </properties>
-    <dependencies>        
+    <dependencies>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
@@ -135,6 +135,16 @@
             <artifactId>spring-boot-configuration-processor</artifactId>
             <optional>true</optional>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <version>2.17.292</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk</artifactId>
+            <version>1.12.321</version>
+        </dependency>
         <!-- TEST -->
         <dependency>
             <groupId>org.springdoc</groupId>
@@ -346,4 +356,4 @@
         <system>JIRA</system>
         <url>https://jira.o-ran-sc.org/</url>
     </issueManagement>
-</project>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/oransc/ics/BeanFactory.java b/src/main/java/org/oransc/ics/BeanFactory.java
index ac6190f..233be90 100644
--- a/src/main/java/org/oransc/ics/BeanFactory.java
+++ b/src/main/java/org/oransc/ics/BeanFactory.java
@@ -29,6 +29,7 @@
 import org.oransc.ics.configuration.ApplicationConfig;
 import org.oransc.ics.controllers.r1producer.ProducerCallbacks;
 import org.oransc.ics.repository.InfoJobs;
+import org.oransc.ics.repository.InfoTypeSubscriptions;
 import org.oransc.ics.repository.InfoTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,11 +70,7 @@
     public InfoJobs infoJobs(SecurityContext securityContext, InfoTypes types) {
         if (infoJobs == null) {
             infoJobs = new InfoJobs(getApplicationConfig(), types, producerCallbacks(securityContext));
-            try {
-                infoJobs.restoreJobsFromDatabase();
-            } catch (Exception e) {
-                logger.error("Could not restore jobs from database: {}", e.getMessage());
-            }
+            infoJobs.restoreJobsFromDatabase().subscribe();
         }
         return infoJobs;
     }
@@ -83,7 +80,7 @@
         if (this.infoTypes == null) {
             infoTypes = new InfoTypes(getApplicationConfig());
             try {
-                infoTypes.restoreTypesFromDatabase();
+                infoTypes.restoreTypesFromDatabase().blockLast();
             } catch (Exception e) {
                 logger.error("Could not restore Information Types from database: {}", e.getMessage());
             }
@@ -92,6 +89,13 @@
     }
 
     @Bean
+    public InfoTypeSubscriptions infoTypeSubscriptions() {
+        InfoTypeSubscriptions s = new InfoTypeSubscriptions(getApplicationConfig());
+        s.restoreFromDatabase().subscribe();
+        return s;
+    }
+
+    @Bean
     public ProducerCallbacks producerCallbacks(SecurityContext securityContext) {
         if (this.producerCallbacks == null) {
             producerCallbacks = new ProducerCallbacks(getApplicationConfig(), securityContext);
diff --git a/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java b/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java
index 50c6daa..6f48cd3 100644
--- a/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java
+++ b/src/main/java/org/oransc/ics/configuration/ApplicationConfig.java
@@ -66,6 +66,22 @@
     @Value("${app.webclient.http.proxy-port:0}")
     private int httpProxyPort = 0;
 
+    @Getter
+    @Value("${app.s3.endpointOverride:}")
+    private String s3EndpointOverride;
+
+    @Getter
+    @Value("${app.s3.accessKeyId:}")
+    private String s3AccessKeyId;
+
+    @Getter
+    @Value("${app.s3.secretAccessKey:}")
+    private String s3SecretAccessKey;
+
+    @Getter
+    @Value("${app.s3.bucket:}")
+    private String s3Bucket;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
@@ -93,4 +109,8 @@
         return this.webClientConfig;
     }
 
+    public boolean isS3Enabled() {
+        return !(s3EndpointOverride.isBlank() || s3Bucket.isBlank());
+    }
+
 }
diff --git a/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java b/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java
index 15394ab..82b4c37 100644
--- a/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java
+++ b/src/main/java/org/oransc/ics/controllers/a1e/A1eCallbacks.java
@@ -73,9 +73,9 @@
 
     private Mono<String> noifyStatusToJobOwner(InfoJob job, InfoProducers eiProducers) {
         boolean isJobEnabled = eiProducers.isJobEnabled(job);
-        A1eEiJobStatus status = isJobEnabled ? new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.ENABLED)
+        A1eEiJobStatus jobStatus = isJobEnabled ? new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.ENABLED)
             : new A1eEiJobStatus(A1eEiJobStatus.EiJobStatusValues.DISABLED);
-        String body = gson.toJson(status);
+        String body = gson.toJson(jobStatus);
         return this.restClient.post(job.getJobStatusUrl(), body) //
             .doOnNext(response -> logger.debug("Consumer notified OK {}", job.getId())) //
             .doOnNext(response -> job.setLastReportedStatus(isJobEnabled)) //
diff --git a/src/main/java/org/oransc/ics/datastore/DataStore.java b/src/main/java/org/oransc/ics/datastore/DataStore.java
new file mode 100644
index 0000000..78d5d13
--- /dev/null
+++ b/src/main/java/org/oransc/ics/datastore/DataStore.java
@@ -0,0 +1,48 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.ics.datastore;
+
+import org.oransc.ics.configuration.ApplicationConfig;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+
+    public Flux<String> listObjects(String prefix);
+
+    public Mono<byte[]> readObject(String fileName);
+
+    public Mono<byte[]> writeObject(String fileName, byte[] fileData);
+
+    public Mono<Boolean> deleteObject(String name);
+
+    public Mono<String> createDataStore();
+
+    public Mono<String> deleteAllData();
+
+    public Mono<String> deleteBucket();
+
+    static DataStore create(ApplicationConfig config, String location) {
+        return config.isS3Enabled() ? new S3ObjectStore(config, location) : new FileStore(config, location);
+    }
+
+}
diff --git a/src/main/java/org/oransc/ics/datastore/FileStore.java b/src/main/java/org/oransc/ics/datastore/FileStore.java
new file mode 100644
index 0000000..bd9c91c
--- /dev/null
+++ b/src/main/java/org/oransc/ics/datastore/FileStore.java
@@ -0,0 +1,158 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.ics.datastore;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.oransc.ics.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class FileStore implements DataStore {
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    ApplicationConfig applicationConfig;
+    private final String location;
+
+    public FileStore(ApplicationConfig applicationConfig, String location) {
+        this.applicationConfig = applicationConfig;
+        this.location = location;
+    }
+
+    @Override
+    public Flux<String> listObjects(String prefix) {
+        Path root = Path.of(path().toString(), prefix);
+        if (!root.toFile().exists()) {
+            root = root.getParent();
+        }
+
+        logger.debug("Listing files in: {}", root);
+
+        List<String> result = new ArrayList<>();
+        try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+            stream.forEach(path -> filterListFiles(path, prefix, result));
+
+            return Flux.fromIterable(result);
+        } catch (Exception e) {
+            return Flux.error(e);
+        }
+    }
+
+    private void filterListFiles(Path path, String prefix, List<String> result) {
+        if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+            result.add(externalName(path));
+        } else {
+            logger.debug("Ignoring file/directory {}, prefix: {}", path, prefix);
+        }
+    }
+
+    private String externalName(Path path) {
+        String fullName = path.toString();
+        String externalName = fullName.substring(path().toString().length());
+        if (externalName.startsWith("/")) {
+            externalName = externalName.substring(1);
+        }
+        return externalName;
+    }
+
+    @Override
+    public Mono<byte[]> readObject(String fileName) {
+        try {
+            byte[] contents = Files.readAllBytes(path(fileName));
+            return Mono.just(contents);
+        } catch (Exception e) {
+            return Mono.error(e);
+        }
+    }
+
+    @Override
+    public Mono<Boolean> deleteObject(String name) {
+        try {
+            Files.delete(path(name));
+            return Mono.just(true);
+        } catch (Exception e) {
+            return Mono.just(false);
+        }
+    }
+
+    @Override
+    public Mono<String> createDataStore() {
+        try {
+            Files.createDirectories(path());
+        } catch (IOException e) {
+            logger.error("Could not create directory: {}, reason: {}", path(), e.getMessage());
+        }
+        return Mono.just("OK");
+    }
+
+    private Path path(String name) {
+        return Path.of(path().toString(), name);
+    }
+
+    private Path path() {
+        return Path.of(applicationConfig.getVardataDirectory(), "database", this.location);
+    }
+
+    @Override
+    public Mono<String> deleteAllData() {
+        return listObjects("") //
+            .flatMap(this::deleteObject) //
+            .collectList() //
+            .map(o -> "OK");
+    }
+
+    @Override
+    public Mono<byte[]> writeObject(String fileName, byte[] fileData) {
+        try {
+            File outputFile = path(fileName).toFile();
+            try (FileOutputStream outputStream = new FileOutputStream(outputFile)) {
+                outputStream.write(fileData);
+            }
+        } catch (IOException e) {
+            logger.debug("Could not write file: {}, reason; {}", path(fileName), e.getMessage());
+        }
+        return Mono.just(fileData);
+    }
+
+    @Override
+    public Mono<String> deleteBucket() {
+        try {
+            FileSystemUtils.deleteRecursively(path(""));
+        } catch (IOException e) {
+            logger.warn("Could not delete: {}, reason: {}", path(""), e.getMessage());
+        }
+        return Mono.just(path("").toString());
+    }
+
+}
diff --git a/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java
new file mode 100644
index 0000000..3434b67
--- /dev/null
+++ b/src/main/java/org/oransc/ics/datastore/S3ObjectStore.java
@@ -0,0 +1,232 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.ics.datastore;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+
+import org.oransc.ics.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+class S3ObjectStore implements DataStore {
+    private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+    private final ApplicationConfig applicationConfig;
+
+    private static S3AsyncClient s3AsynchClient;
+    private final String location;
+
+    public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
+        this.applicationConfig = applicationConfig;
+        this.location = location;
+
+        getS3AsynchClient(applicationConfig);
+    }
+
+    private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
+        if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+            s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+        }
+        return s3AsynchClient;
+    }
+
+    private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
+        URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+        return S3AsyncClient.builder() //
+            .region(Region.US_EAST_1) //
+            .endpointOverride(uri) //
+            .credentialsProvider(StaticCredentialsProvider.create( //
+                AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+                    applicationConfig.getS3SecretAccessKey())));
+    }
+
+    @Override
+    public Flux<String> listObjects(String prefix) {
+        return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
+            .map(this::externalName);
+    }
+
+    @Override
+    public Mono<Boolean> deleteObject(String name) {
+        DeleteObjectRequest request = DeleteObjectRequest.builder() //
+            .bucket(bucket()) //
+            .key(key(name)) //
+            .build();
+
+        CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+        return Mono.fromFuture(future).map(resp -> true);
+    }
+
+    @Override
+    public Mono<byte[]> readObject(String fileName) {
+        return getDataFromS3Object(bucket(), fileName);
+    }
+
+    @Override
+    public Mono<byte[]> writeObject(String fileName, byte[] fileData) {
+
+        PutObjectRequest request = PutObjectRequest.builder() //
+            .bucket(bucket()) //
+            .key(key(fileName)) //
+            .build();
+
+        AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
+
+        CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+        return Mono.fromFuture(future) //
+            .map(putObjectResponse -> fileData) //
+            .doOnError(t -> logger.error("Failed to store file in S3 {}", t.getMessage()));
+    }
+
+    @Override
+    public Mono<String> createDataStore() {
+        return createS3Bucket(bucket());
+    }
+
+    private Mono<String> createS3Bucket(String s3Bucket) {
+
+        CreateBucketRequest request = CreateBucketRequest.builder() //
+            .bucket(s3Bucket) //
+            .build();
+
+        CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+        return Mono.fromFuture(future) //
+            .map(f -> s3Bucket) //
+            .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+            .onErrorResume(t -> Mono.just(s3Bucket));
+    }
+
+    @Override
+    public Mono<String> deleteAllData() {
+        return listObjects("") //
+            .flatMap(key -> deleteObject(key)) //
+            .collectList() //
+            .map(resp -> "OK")
+            .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
+            .onErrorResume(t -> Mono.just("NOK"));
+    }
+
+    @Override
+    public Mono<String> deleteBucket() {
+        return deleteBucketFromS3Storage()
+            .doOnError(t -> logger.warn("Could not delete: {}, reason: {}", bucket(), t.getMessage()))
+            .map(x -> bucket()).onErrorResume(t -> Mono.just(bucket()));
+    }
+
+    private Mono<DeleteBucketResponse> deleteBucketFromS3Storage() {
+        DeleteBucketRequest request = DeleteBucketRequest.builder() //
+            .bucket(bucket()) //
+            .build();
+
+        CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+        return Mono.fromFuture(future);
+    }
+
+    private String bucket() {
+        return applicationConfig.getS3Bucket();
+    }
+
+    private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
+        ListObjectsResponse prevResponse) {
+        ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
+            .bucket(bucket) //
+            .maxKeys(1000) //
+            .prefix(prefix);
+
+        if (prevResponse != null) {
+            if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
+                builder.marker(prevResponse.nextMarker());
+            } else {
+                return Mono.empty();
+            }
+        }
+
+        ListObjectsRequest listObjectsRequest = builder.build();
+        CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+        return Mono.fromFuture(future);
+    }
+
+    private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+
+        return listObjectsRequest(bucket, prefix, null) //
+            .expand(response -> listObjectsRequest(bucket, prefix, response)) //
+            .map(ListObjectsResponse::contents) //
+            .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+            .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+            .flatMap(Flux::fromIterable) //
+            .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
+    }
+
+    private Mono<byte[]> getDataFromS3Object(String bucket, String fileName) {
+
+        GetObjectRequest request = GetObjectRequest.builder() //
+            .bucket(bucket) //
+            .key(key(fileName)) //
+            .build();
+
+        CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+            s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+        return Mono.fromFuture(future) //
+            .map(b -> b.asByteArray()) //
+            .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(fileName), bucket,
+                t.getMessage())) //
+            .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(fileName))) //
+            .onErrorResume(t -> Mono.empty());
+    }
+
+    private String key(String fileName) {
+        return location + "/" + fileName;
+    }
+
+    private String externalName(String internalName) {
+        return internalName.substring(key("").length());
+    }
+
+}
diff --git a/src/main/java/org/oransc/ics/repository/InfoJobs.java b/src/main/java/org/oransc/ics/repository/InfoJobs.java
index 8f8e0e9..8085573 100644
--- a/src/main/java/org/oransc/ics/repository/InfoJobs.java
+++ b/src/main/java/org/oransc/ics/repository/InfoJobs.java
@@ -24,27 +24,22 @@
 import com.google.gson.GsonBuilder;
 import com.google.gson.TypeAdapterFactory;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.ServiceLoader;
 import java.util.Vector;
 
 import org.oransc.ics.configuration.ApplicationConfig;
 import org.oransc.ics.controllers.r1producer.ProducerCallbacks;
+import org.oransc.ics.datastore.DataStore;
 import org.oransc.ics.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
-import org.springframework.util.FileSystemUtils;
+import reactor.core.publisher.Flux;
 
 /**
  * Dynamic representation of all existing Information Jobs.
@@ -62,6 +57,8 @@
 
     private final ProducerCallbacks producerCallbacks;
 
+    private final DataStore dataStore;
+
     public InfoJobs(ApplicationConfig config, InfoTypes infoTypes, ProducerCallbacks producerCallbacks) {
         this.config = config;
         GsonBuilder gsonBuilder = new GsonBuilder();
@@ -69,40 +66,47 @@
         this.gson = gsonBuilder.create();
         this.producerCallbacks = producerCallbacks;
         this.infoTypes = infoTypes;
+        this.dataStore = DataStore.create(config, "infojobs");
+        this.dataStore.createDataStore().subscribe();
     }
 
-    public synchronized void restoreJobsFromDatabase() throws IOException {
-        Files.createDirectories(Paths.get(getDatabaseDirectory()));
-        File dbDir = new File(getDatabaseDirectory());
+    public synchronized Flux<InfoJob> restoreJobsFromDatabase() {
+        return dataStore.listObjects("") //
+            .flatMap(dataStore::readObject) //
+            .map(this::toPersistentData) //
+            .map(this::toInfoJob) //
+            .filter(Objects::nonNull) //
+            .doOnNext(this::doPut) //
+            .doOnError(t -> logger.error("Could not restore jobs from datastore, reason: {}", t.getMessage()));
+    }
 
-        for (File file : dbDir.listFiles()) {
-            String json = Files.readString(file.toPath());
-            InfoJob.PersistentData data = gson.fromJson(json, InfoJob.PersistentData.class);
-            try {
-                InfoJob job = toInfoJob(data);
-                this.doPut(job);
-            } catch (ServiceException e) {
-                logger.warn("Could not restore job:{},reason: {}", data.getId(), e.getMessage());
-            }
+    private InfoJob.PersistentData toPersistentData(byte[] bytes) {
+        String json = new String(bytes);
+        return gson.fromJson(json, InfoJob.PersistentData.class);
+    }
+
+    private InfoJob toInfoJob(InfoJob.PersistentData data) {
+        InfoType type;
+        try {
+            type = infoTypes.getType(data.getTypeId());
+            return InfoJob.builder() //
+                .id(data.getId()) //
+                .type(type) //
+                .owner(data.getOwner()) //
+                .jobData(data.getJobData()) //
+                .targetUrl(data.getTargetUrl()) //
+                .jobStatusUrl(data.getJobStatusUrl()) //
+                .lastUpdated(data.getLastUpdated()) //
+                .build();
+        } catch (ServiceException e) {
+            logger.error("Error restoring info job: {}, reason: {}", data.getId(), e.getMessage());
         }
-    }
-
-    private InfoJob toInfoJob(InfoJob.PersistentData data) throws ServiceException {
-        InfoType type = infoTypes.getType(data.getTypeId());
-        return InfoJob.builder() //
-            .id(data.getId()) //
-            .type(type) //
-            .owner(data.getOwner()) //
-            .jobData(data.getJobData()) //
-            .targetUrl(data.getTargetUrl()) //
-            .jobStatusUrl(data.getJobStatusUrl()) //
-            .lastUpdated(data.getLastUpdated()) //
-            .build();
+        return null;
     }
 
     public synchronized void put(InfoJob job) {
         this.doPut(job);
-        storeJobInFile(job);
+        storeJob(job);
     }
 
     public synchronized Collection<InfoJob> getJobs() {
@@ -146,11 +150,8 @@
         jobsByType.remove(job.getType().getId(), job);
         jobsByOwner.remove(job.getOwner(), job);
 
-        try {
-            Files.delete(getPath(job));
-        } catch (IOException e) {
-            logger.warn("Could not remove file: {}", e.getMessage());
-        }
+        this.dataStore.deleteObject(getPath(job)).subscribe();
+
         this.producerCallbacks.stopInfoJob(job, infoProducers);
 
     }
@@ -163,16 +164,8 @@
         this.allEiJobs.clear();
         this.jobsByType.clear();
         jobsByOwner.clear();
-        clearDatabase();
-    }
 
-    private void clearDatabase() {
-        try {
-            FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
-            Files.createDirectories(Paths.get(getDatabaseDirectory()));
-        } catch (IOException e) {
-            logger.warn("Could not delete database : {}", e.getMessage());
-        }
+        dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block();
     }
 
     private void doPut(InfoJob job) {
@@ -181,26 +174,16 @@
         jobsByOwner.put(job.getOwner(), job);
     }
 
-    private void storeJobInFile(InfoJob job) {
-        try {
-            try (PrintStream out = new PrintStream(new FileOutputStream(getFile(job)))) {
-                out.print(gson.toJson(job.getPersistentData()));
-            }
-        } catch (Exception e) {
-            logger.warn("Could not store job: {} {}", job.getId(), e.getMessage());
-        }
+    private void storeJob(InfoJob job) {
+        String json = gson.toJson(job.getPersistentData());
+        byte[] bytes = json.getBytes();
+        this.dataStore.writeObject(this.getPath(job), bytes) //
+            .doOnError(t -> logger.error("Could not store job in datastore, reason: {}", t.getMessage())) //
+            .subscribe();
     }
 
-    private File getFile(InfoJob job) {
-        return getPath(job).toFile();
-    }
-
-    private Path getPath(InfoJob job) {
-        return Path.of(getDatabaseDirectory(), job.getId());
-    }
-
-    private String getDatabaseDirectory() {
-        return config.getVardataDirectory() + "/database/eijobs";
+    private String getPath(InfoJob job) {
+        return job.getId();
     }
 
 }
diff --git a/src/main/java/org/oransc/ics/repository/InfoProducers.java b/src/main/java/org/oransc/ics/repository/InfoProducers.java
index 4ea881f..e8e206a 100644
--- a/src/main/java/org/oransc/ics/repository/InfoProducers.java
+++ b/src/main/java/org/oransc/ics/repository/InfoProducers.java
@@ -135,9 +135,8 @@
     }
 
     public synchronized boolean isJobEnabled(InfoJob job) {
-        InfoType type;
         try {
-            type = this.infoTypes.getType(job.getType().getId());
+            InfoType type = this.infoTypes.getType(job.getType().getId());
 
             for (InfoProducer producer : this.getProducersSupportingType(type)) {
                 if (producer.isJobEnabled(job)) {
diff --git a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java
index 2557f3c..97fa487 100644
--- a/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java
+++ b/src/main/java/org/oransc/ics/repository/InfoTypeSubscriptions.java
@@ -23,18 +23,12 @@
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Vector;
 import java.util.function.Function;
 
@@ -42,13 +36,13 @@
 import lombok.Getter;
 
 import org.oransc.ics.configuration.ApplicationConfig;
+import org.oransc.ics.datastore.DataStore;
 import org.oransc.ics.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.HttpStatus;
-import org.springframework.util.FileSystemUtils;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -66,6 +60,7 @@
     private final Gson gson = new GsonBuilder().create();
     private final ApplicationConfig config;
     private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
+    private final DataStore dataStore;
 
     public interface ConsumerCallbackHandler {
         Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
@@ -96,17 +91,12 @@
             }
             return this.id.equals(o);
         }
-
     }
 
     public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
         this.config = config;
-
-        try {
-            this.restoreFromDatabase();
-        } catch (IOException e) {
-            logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
-        }
+        this.dataStore = DataStore.create(config, "infotypesubscriptions");
+        this.dataStore.createDataStore().subscribe();
     }
 
     public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
@@ -162,12 +152,7 @@
     public void remove(SubscriptionInfo subscription) {
         allSubscriptions.remove(subscription.getId());
         subscriptionsByOwner.remove(subscription.owner, subscription);
-
-        try {
-            Files.delete(getPath(subscription));
-        } catch (Exception e) {
-            logger.debug("Could not delete subscription from database: {}", e.getMessage());
-        }
+        dataStore.deleteObject(getPath(subscription)).subscribe();
 
         logger.debug("Removed type status subscription {}", subscription.id);
     }
@@ -247,34 +232,28 @@
     }
 
     private void clearDatabase() {
-        try {
-            FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
-            Files.createDirectories(Paths.get(getDatabaseDirectory()));
-        } catch (IOException e) {
-            logger.warn("Could not delete database : {}", e.getMessage());
-        }
+        this.dataStore.deleteAllData().block();
     }
 
     private void storeInFile(SubscriptionInfo subscription) {
-        try {
-            try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
-                String json = gson.toJson(subscription);
-                out.print(json);
-            }
-        } catch (Exception e) {
-            logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
-        }
+        String json = gson.toJson(subscription);
+        byte[] bytes = json.getBytes();
+        this.dataStore.writeObject(this.getPath(subscription), bytes)
+            .doOnError(t -> logger.error("Could not store infotype subscription, reason: {}", t.getMessage())) //
+            .subscribe();
     }
 
-    public synchronized void restoreFromDatabase() throws IOException {
-        Files.createDirectories(Paths.get(getDatabaseDirectory()));
-        File dbDir = new File(getDatabaseDirectory());
+    public synchronized Flux<SubscriptionInfo> restoreFromDatabase() {
+        return dataStore.listObjects("") //
+            .flatMap(dataStore::readObject) //
+            .map(this::toSubscriptionInfo) //
+            .filter(Objects::nonNull) //
+            .doOnNext(this::doPut);//
+    }
 
-        for (File file : dbDir.listFiles()) {
-            String json = Files.readString(file.toPath());
-            SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
-            doPut(subscription);
-        }
+    private SubscriptionInfo toSubscriptionInfo(byte[] bytes) {
+        String json = new String(bytes);
+        return gson.fromJson(json, SubscriptionInfo.class);
     }
 
     private void doPut(SubscriptionInfo subscription) {
@@ -282,20 +261,8 @@
         subscriptionsByOwner.put(subscription.owner, subscription);
     }
 
-    private File getFile(SubscriptionInfo subscription) {
-        return getPath(subscription).toFile();
-    }
-
-    private Path getPath(SubscriptionInfo subscription) {
-        return getPath(subscription.getId());
-    }
-
-    private Path getPath(String subscriptionId) {
-        return Path.of(getDatabaseDirectory(), subscriptionId);
-    }
-
-    private String getDatabaseDirectory() {
-        return config.getVardataDirectory() + "/database/infotypesubscriptions";
+    private String getPath(SubscriptionInfo subscription) {
+        return subscription.getId();
     }
 
 }
diff --git a/src/main/java/org/oransc/ics/repository/InfoTypes.java b/src/main/java/org/oransc/ics/repository/InfoTypes.java
index 110bca2..98188ff 100644
--- a/src/main/java/org/oransc/ics/repository/InfoTypes.java
+++ b/src/main/java/org/oransc/ics/repository/InfoTypes.java
@@ -24,26 +24,21 @@
 import com.google.gson.GsonBuilder;
 import com.google.gson.TypeAdapterFactory;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.ServiceLoader;
 import java.util.Vector;
 
 import org.oransc.ics.configuration.ApplicationConfig;
+import org.oransc.ics.datastore.DataStore;
 import org.oransc.ics.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
-import org.springframework.util.FileSystemUtils;
+import reactor.core.publisher.Flux;
 
 /**
  * Dynamic representation of all Information Types in the system.
@@ -54,24 +49,30 @@
     private final Map<String, InfoType> allInfoTypes = new HashMap<>();
     private final ApplicationConfig config;
     private final Gson gson;
+    private final DataStore dataStore;
 
     public InfoTypes(ApplicationConfig config) {
         this.config = config;
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
         this.gson = gsonBuilder.create();
+
+        this.dataStore = DataStore.create(config, "infotypes");
+        this.dataStore.createDataStore().subscribe();
     }
 
-    public synchronized void restoreTypesFromDatabase() throws IOException {
-        Files.createDirectories(Paths.get(getDatabaseDirectory()));
-        File dbDir = new File(getDatabaseDirectory());
+    public synchronized Flux<InfoType> restoreTypesFromDatabase() {
+        return dataStore.listObjects("") //
+            .flatMap(dataStore::readObject) //
+            .map(this::toInfoType) //
+            .filter(Objects::nonNull) //
+            .doOnNext(type -> allInfoTypes.put(type.getId(), type)) //
+            .doOnError(t -> logger.error("Could not restore types from datastore, reason: {}", t.getMessage()));
+    }
 
-        for (File file : dbDir.listFiles()) {
-            String json = Files.readString(file.toPath());
-            InfoType.PersistentInfo storedData = gson.fromJson(json, InfoType.PersistentInfo.class);
-            InfoType type = new InfoType(storedData);
-            allInfoTypes.put(type.getId(), type);
-        }
+    private InfoType toInfoType(byte[] bytes) {
+        String json = new String(bytes);
+        return gson.fromJson(json, InfoType.class);
     }
 
     public synchronized void put(InfoType type) {
@@ -97,11 +98,7 @@
 
     public synchronized void remove(InfoType type) {
         allInfoTypes.remove(type.getId());
-        try {
-            Files.delete(getPath(type));
-        } catch (IOException e) {
-            logger.warn("Could not remove file: {} {}", type.getId(), e.getMessage());
-        }
+        dataStore.deleteObject(getPath(type)).block();
     }
 
     public synchronized int size() {
@@ -110,7 +107,7 @@
 
     public synchronized void clear() {
         this.allInfoTypes.clear();
-        clearDatabase();
+        dataStore.deleteAllData().flatMap(s -> dataStore.createDataStore()).block();
     }
 
     public synchronized InfoType getCompatibleType(String typeId) throws ServiceException {
@@ -127,38 +124,15 @@
         return compatibleTypes.iterator().next();
     }
 
-    private void clearDatabase() {
-        try {
-            FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
-            Files.createDirectories(Paths.get(getDatabaseDirectory()));
-        } catch (IOException e) {
-            logger.warn("Could not delete database : {}", e.getMessage());
-        }
-    }
-
     private void storeInFile(InfoType type) {
-        try {
-            try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) {
-                out.print(gson.toJson(type.getPersistentInfo()));
-            }
-        } catch (Exception e) {
-            logger.warn("Could not save type: {} {}", type.getId(), e.getMessage());
-        }
+        String json = gson.toJson(type);
+        byte[] bytes = json.getBytes();
+        this.dataStore.writeObject(this.getPath(type), bytes)
+            .doOnError(t -> logger.error("Could not store infotype in datastore, reason: {}", t.getMessage())) //
+            .subscribe();
     }
 
-    private File getFile(InfoType type) {
-        return getPath(type).toFile();
-    }
-
-    private Path getPath(InfoType type) {
-        return getPath(type.getId());
-    }
-
-    private Path getPath(String typeId) {
-        return Path.of(getDatabaseDirectory(), typeId);
-    }
-
-    private String getDatabaseDirectory() {
-        return config.getVardataDirectory() + "/database/eitypes";
+    private String getPath(InfoType type) {
+        return type.getId();
     }
 }
diff --git a/src/test/java/org/oransc/ics/ApplicationTest.java b/src/test/java/org/oransc/ics/ApplicationTest.java
index 77382b7..2f7c1bf 100644
--- a/src/test/java/org/oransc/ics/ApplicationTest.java
+++ b/src/test/java/org/oransc/ics/ApplicationTest.java
@@ -37,6 +37,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import org.json.JSONObject;
@@ -68,6 +69,7 @@
 import org.oransc.ics.controllers.r1producer.ProducerJobInfo;
 import org.oransc.ics.controllers.r1producer.ProducerRegistrationInfo;
 import org.oransc.ics.controllers.r1producer.ProducerStatusInfo;
+import org.oransc.ics.datastore.DataStore;
 import org.oransc.ics.exceptions.ServiceException;
 import org.oransc.ics.repository.InfoJob;
 import org.oransc.ics.repository.InfoJobs;
@@ -103,7 +105,9 @@
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.webclient.trust-store-used=true", //
-        "app.vardata-directory=./target"})
+        "app.vardata-directory=/tmp/ics", //
+        "app.s3.bucket=" // If this is set, S3 will be used to store data.
+    })
 class ApplicationTest {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -983,18 +987,37 @@
     }
 
     @Test
+    void testDb() {
+        DataStore db = DataStore.create(this.applicationConfig, "test");
+        db.createDataStore().block();
+        final int NO_OF_OBJS = 1200;
+        for (int i = 0; i < NO_OF_OBJS; ++i) {
+            String data = "data";
+            db.writeObject("Obj_" + i, data.getBytes()).block();
+        }
+
+        List<?> entries = db.listObjects("").collectList().block();
+        assertThat(entries).hasSize(NO_OF_OBJS);
+
+        db.listObjects("").doOnNext(name -> logger.debug("deleted {}", name)).flatMap(name -> db.deleteObject(name))
+            .blockLast();
+
+        db.createDataStore().block();
+
+    }
+
+    @Test
     void testJobDatabasePersistence() throws Exception {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
         putInfoJob(TYPE_ID, "jobId1");
         putInfoJob(TYPE_ID, "jobId2");
 
         assertThat(this.infoJobs.size()).isEqualTo(2);
-
         {
             InfoJob savedJob = this.infoJobs.getJob("jobId1");
             // Restore the jobs
             InfoJobs jobs = new InfoJobs(this.applicationConfig, this.infoTypes, this.producerCallbacks);
-            jobs.restoreJobsFromDatabase();
+            jobs.restoreJobsFromDatabase().blockLast();
             assertThat(jobs.size()).isEqualTo(2);
             InfoJob restoredJob = jobs.getJob("jobId1");
             assertThat(restoredJob.getPersistentData()).isEqualTo(savedJob.getPersistentData());
@@ -1007,7 +1030,7 @@
         {
             // Restore the jobs, no jobs in database
             InfoJobs jobs = new InfoJobs(this.applicationConfig, this.infoTypes, this.producerCallbacks);
-            jobs.restoreJobsFromDatabase();
+            jobs.restoreJobsFromDatabase().blockLast();
             assertThat(jobs.size()).isZero();
         }
         logger.warn("Test removing a job when the db file is gone");
@@ -1028,7 +1051,7 @@
         {
             // Restore the types
             InfoTypes restoredTypes = new InfoTypes(this.applicationConfig);
-            restoredTypes.restoreTypesFromDatabase();
+            restoredTypes.restoreTypesFromDatabase().blockLast();
             InfoType restoredType = restoredTypes.getType(TYPE_ID);
             assertThat(restoredType.getPersistentInfo()).isEqualTo(savedType.getPersistentInfo());
             assertThat(restoredTypes.size()).isEqualTo(1);
@@ -1037,7 +1060,7 @@
             // Restore the jobs, no jobs in database
             InfoTypes restoredTypes = new InfoTypes(this.applicationConfig);
             restoredTypes.clear();
-            restoredTypes.restoreTypesFromDatabase();
+            restoredTypes.restoreTypesFromDatabase().blockLast();
             assertThat(restoredTypes.size()).isZero();
         }
         logger.warn("Test removing a job when the db file is gone");
@@ -1046,7 +1069,7 @@
     }
 
     @Test
-    void testConsumerTypeSubscriptionDatabase() {
+    void testConsumerTypeSubscriptionDatabase() throws Exception {
         final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
         final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
 
@@ -1055,7 +1078,11 @@
         restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
         assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
 
+        if (this.applicationConfig.isS3Enabled()) {
+            Thread.sleep(1000); // Storing in S3 is asynch, so it can take some millis
+        }
         InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+        restoredSubscriptions.restoreFromDatabase().blockLast();
         assertThat(restoredSubscriptions.size()).isEqualTo(1);
         assertThat(restoredSubscriptions.getSubscriptionsForOwner("owner")).hasSize(1);