Second part loading PRH CONF

*Priority for loading configuration
*Run asynchronus task in parallel which
is responsible for dynamic hot swaping
configuration from CONSUL/CBS

Change-Id: I03ca0458e34eb71404c5ee8263d4cd476e99290b
Issue-ID: DCAEGEN2-696
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
index 5fe7c60..2357e1d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
@@ -27,7 +27,6 @@
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
index d83c813..c5c77ec 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
@@ -21,7 +21,6 @@
 package org.onap.dcaegen2.services.prh.configuration;
 
 import java.util.Optional;
-
 import java.util.function.Predicate;
 import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
@@ -32,7 +31,6 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.stereotype.Component;
 
 
 /**
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java
new file mode 100644
index 0000000..808c4a5
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.prh.config.ImmutableAaiClientConfiguration;
+import org.onap.dcaegen2.services.prh.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.config.ImmutableDmaapPublisherConfiguration;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18
+ */
+class CloudConfigParser {
+
+    private final JsonObject jsonObject;
+
+    CloudConfigParser(JsonObject jsonObject) {
+        this.jsonObject = jsonObject;
+    }
+
+    DmaapPublisherConfiguration getDmaapPublisherConfig() {
+        return new ImmutableDmaapPublisherConfiguration.Builder()
+            .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
+            .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
+            .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
+            .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
+            .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+            .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
+            .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
+            .build();
+    }
+
+    AaiClientConfiguration getAaiClientConfig() {
+        return new ImmutableAaiClientConfiguration.Builder()
+            .aaiHost(jsonObject.get("aai.aaiClientConfiguration.aaiHost").getAsString())
+            .aaiPort(jsonObject.get("aai.aaiClientConfiguration.aaiHostPortNumber").getAsInt())
+            .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString())
+            .aaiPnfPath(jsonObject.get("aai.aaiClientConfiguration.aaiPnfPath").getAsString())
+            .aaiIgnoreSslCertificateErrors(
+                jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean())
+            .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString())
+            .aaiProtocol(jsonObject.get("aai.aaiClientConfiguration.aaiProtocol").getAsString())
+            .aaiBasePath(jsonObject.get("aai.aaiClientConfiguration.aaiBasePath").getAsString())
+            .build();
+    }
+
+    DmaapConsumerConfiguration getDmaapConsumerConfig() {
+        return new ImmutableDmaapConsumerConfiguration.Builder()
+            .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMS").getAsInt())
+            .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
+            .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
+            .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
+            .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
+            .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
+            .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
+            .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
+            .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
+            .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+            .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+            .build();
+    }
+}
\ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
index 5b5c038..82017a9 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
@@ -20,7 +20,12 @@
 
 package org.onap.dcaegen2.services.prh.configuration;
 
+import com.google.gson.JsonObject;
+import java.util.Optional;
 import java.util.Properties;
+import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.prh.model.EnvProperties;
 import org.onap.dcaegen2.services.prh.service.HttpClientExecutorService;
 import org.slf4j.Logger;
@@ -46,6 +51,10 @@
     private Logger logger = LoggerFactory.getLogger(this.getClass());
     private HttpClientExecutorService httpClientExecutorService;
 
+    private AaiClientConfiguration aaiClientCloudConfiguration;
+    private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
+    private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
+
     TaskScheduler cloudTaskScheduler;
 
     @Value("#{systemEnvironment}")
@@ -62,17 +71,45 @@
     protected void runTask() {
         Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment))
             .subscribeOn(Schedulers.parallel())
-            .subscribe(this::doOnSucces, this::doOnError);
+            .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
     }
 
-    private void doOnError(Throwable throwable) {
-        logger.warn("Error in case of processing system environment.%nMore details below:%n ", throwable);
+    private void parsingConfigError(Throwable throwable) {
+        logger.warn("Error in case of processing system environment, more details below: ", throwable);
     }
 
-    private void doOnSucces(EnvProperties envProperties) {
+    private void cloudConfigError(Throwable throwable) {
+        logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
+    }
+
+    private void parsingConfigSuccess(EnvProperties envProperties) {
         logger.info("Fetching PRH configuration from ConfigBindingService/Consul");
         Flux.just(httpClientExecutorService.callConsulForConfigBindingServiceEndpoint(envProperties))
-            .flatMap(configBindingServiceUri -> httpClientExecutorService.callConfigBindingServiceForPrhConfiguration(envProperties,
-                configBindingServiceUri)).subscribe();
+            .flatMap(configBindingServiceUri -> httpClientExecutorService
+                .callConfigBindingServiceForPrhConfiguration(envProperties,
+                    configBindingServiceUri)).subscribe(this::parseCloudConfig, this::cloudConfigError);
+    }
+
+    private void parseCloudConfig(JsonObject jsonObject) {
+        logger.info("Received application configuration: {}", jsonObject);
+        CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
+        dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
+        aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig();
+        dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
+    }
+
+    @Override
+    public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
+        return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
+    }
+
+    @Override
+    public AaiClientConfiguration getAaiClientConfiguration() {
+        return Optional.ofNullable(aaiClientCloudConfiguration).orElse(super.getAaiClientConfiguration());
+    }
+
+    @Override
+    public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
+        return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
     }
 }
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
index d190510..d3b6cbb 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
@@ -34,13 +34,14 @@
  */
 class EnvironmentProcessor {
 
+    private static final int DEFAULT_CONSUL_PORT = 8500;
     private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
 
     private EnvironmentProcessor() {
     }
 
     static Flux<EnvProperties> evaluate(Properties systemEnvironment) {
-        logger.info("Loading configuration from system environment variables");
+        logger.info("Loading configuration from system environment variables {}", systemEnvironment);
         EnvProperties envProperties;
         try {
             envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
@@ -78,8 +79,8 @@
 
     private static Integer getDefaultPortOfConsul() {
         logger.warn("$CONSUL_PORT environment has not been defined");
-        logger.warn("$CONSUL_PORT variable will be set to default port {}", 8500);
-        return 8500;
+        logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+        return DEFAULT_CONSUL_PORT;
     }
 }
 
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
index 1bce1c0..8782a18 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
@@ -26,7 +26,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.PostConstruct;
 import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -34,10 +33,8 @@
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.support.PeriodicTrigger;
 import reactor.core.publisher.Mono;
 
 /**
@@ -49,7 +46,7 @@
 
     private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 2000;
     private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 1;
-    private static volatile List<ScheduledFuture> scheduledPrgTaskFutureList = new ArrayList<>();
+    private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
 
     private final ConcurrentTaskScheduler taskScheduler;
     private final ScheduledTasks scheduledTask;
@@ -68,8 +65,8 @@
      */
     @ApiOperation(value = "Get response on stopping task execution")
     public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
-        scheduledPrgTaskFutureList.forEach(x -> x.cancel(false));
-        scheduledPrgTaskFutureList.clear();
+        scheduledPrhTaskFutureList.forEach(x -> x.cancel(false));
+        scheduledPrhTaskFutureList.clear();
         return Mono.defer(() ->
             Mono.just(new ResponseEntity<>("PRH Service has already been stopped!", HttpStatus.CREATED))
         );
@@ -84,11 +81,11 @@
     @PostConstruct
     @ApiOperation(value = "Start task if possible")
     public synchronized boolean tryToStartTask() {
-        if (scheduledPrgTaskFutureList.isEmpty()) {
-            scheduledPrgTaskFutureList.add(cloudTaskScheduler
+        if (scheduledPrhTaskFutureList.isEmpty()) {
+            scheduledPrhTaskFutureList.add(cloudTaskScheduler
                 .scheduleAtFixedRate(super::runTask, Instant.now(),
                     Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
-            scheduledPrgTaskFutureList.add(taskScheduler
+            scheduledPrhTaskFutureList.add(taskScheduler
                 .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, SCHEDULING_DELAY_FOR_PRH_TASKS));
             return true;
         } else {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java
index 01081f4..1b69f5f 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpClientExecutorService.java
@@ -20,9 +20,19 @@
 
 package org.onap.dcaegen2.services.prh.service;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import org.onap.dcaegen2.services.prh.model.EnvProperties;
 import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
 import org.springframework.stereotype.Service;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
@@ -31,17 +41,96 @@
 @Service
 public class HttpClientExecutorService {
 
-    public String callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
-        return null;
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    public Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
+
+        return HttpGetClient.callHttpGet(
+            envProperties.consulHost() + ":" + envProperties.consulPort() + "/v1/catalog/service/" + envProperties
+                .cbsName())
+            .flatMap(this::getJsonArrayFromRequest)
+            .flatMap(jsonArray -> Mono.just(jsonArray.get(0)))
+            .flatMap(this::createConfigBindingServiceURL);
+
     }
 
-    public Publisher<String> callConfigBindingServiceForPrhConfiguration(EnvProperties envProperties,
-        String configBindingServiceUri) {
+    public Publisher<JsonObject> callConfigBindingServiceForPrhConfiguration(EnvProperties envProperties,
+        Mono<String> configBindingServiceUri) {
+        return HttpGetClient.callHttpGet(configBindingServiceUri + "/service_component/" + envProperties.appName())
+            .flatMap(this::getJsonConfiguration);
+    }
 
-        return null;
+    private Mono<? extends JsonObject> getJsonConfiguration(String body) {
+        JsonElement jsonElement = new Gson().toJsonTree(body);
+        try {
+            return Mono.just(jsonElement.getAsJsonObject());
+        } catch (IllegalStateException e) {
+            return Mono.error(e);
+        }
+    }
+
+    private Mono<String> createConfigBindingServiceURL(JsonElement jsonElement) {
+        JsonObject jsonObject;
+        try {
+            jsonObject = jsonElement.getAsJsonObject();
+        } catch (IllegalStateException e) {
+            return Mono.error(e);
+        }
+        return Mono.just(jsonObject.get("ServiceAddress").toString() + ":" + jsonObject.get("ServicePort").toString());
+    }
+
+
+    private Mono<? extends JsonArray> getJsonArrayFromRequest(String body) {
+        JsonElement jsonElement = new Gson().toJsonTree(body);
+        try {
+            return Mono.just(jsonElement.getAsJsonArray());
+        } catch (IllegalStateException e) {
+            logger.warn("Converting string to jsonArray threw error: " + e);
+            return Mono.error(e);
+        }
     }
 
     private static class HttpGetClient {
 
+        private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+
+        private static WebClient webClient;
+
+        private HttpGetClient() {
+        }
+
+        private static Mono<String> callHttpGet(String url) {
+            return webClient
+                .get()
+                .uri(url)
+                .retrieve()
+                .onStatus(HttpStatus::is4xxClientError, response ->
+                    Mono.error(new Exception("Request for cloud config failed: HTTP 400")))
+                .onStatus(HttpStatus::is5xxServerError, response ->
+                    Mono.error(new Exception("Request for cloud config failed: HTTP 500")))
+                .bodyToMono(String.class);
+        }
+
+        private static ExchangeFilterFunction logResponse() {
+            return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+                logger.info("Response status {}", clientResponse.statusCode());
+                return Mono.just(clientResponse);
+            });
+        }
+
+        private static ExchangeFilterFunction logRequest() {
+            return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+                logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+                clientRequest.headers()
+                    .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+                return Mono.just(clientRequest);
+            });
+        }
+
+        static {
+            webClient = WebClient.builder().filter(logRequest()).filter(logResponse()).build();
+        }
+
+
     }
 }
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
index 0738847..eb62d3c 100644
--- a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
+++ b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
@@ -71,7 +71,7 @@
                 .body(BodyInserters.fromObject(consumerDmaapModelMono))
                 .retrieve()
                 .onStatus(HttpStatus::is4xxClientError, clientResponse ->
-                    Mono.error(new Exception("HTTP 400"))
+                     Mono.error(new Exception("HTTP 400"))
                 )
                 .onStatus(HttpStatus::is5xxServerError, clientResponse ->
                     Mono.error(new Exception("HTTP 500")))