Add A1 controller client in policy-agent

- Dashboard and policy-agent can run in docker containers
- Policy-agent container uses local json config file when not running consul

Change-Id: I4973999969e462a7c1c1a768b4dd9b91e8024542
Signed-off-by: RehanRaza <muhammad.rehan.raza@est.tech>
diff --git a/policy-agent/Dockerfile b/policy-agent/Dockerfile
index d0ddd40..26ce531 100644
--- a/policy-agent/Dockerfile
+++ b/policy-agent/Dockerfile
@@ -27,7 +27,7 @@
 
 EXPOSE 8081
 
-ADD /config/application.yaml /opt/app/policy-agent/config/
+ADD /config/* /opt/app/policy-agent/config/
 ADD target/${JAR} /opt/app/policy-agent/policy-agent.jar
 
 
diff --git a/policy-agent/config/application.yaml b/policy-agent/config/application.yaml
index b41c381..5b6df77 100644
--- a/policy-agent/config/application.yaml
+++ b/policy-agent/config/application.yaml
@@ -15,10 +15,10 @@
     org.springframework: ERROR
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
-    org.onap.dcaegen2.collectors.datafile: WARN
+    org.oransc.policyagent: WARN
   file: /var/log/policy-agent/application.log
 app:
-  filepath: src/test/resources/test_application_configuration.json
+  filepath: /opt/app/policy-agent/config/application_configuration.json
 
 server:
    port : 8081
diff --git a/policy-agent/config/application_configuration.json b/policy-agent/config/application_configuration.json
new file mode 100644
index 0000000..cb039ca
--- /dev/null
+++ b/policy-agent/config/application_configuration.json
@@ -0,0 +1,23 @@
+{
+   "config": {
+      "//description": "Application configuration",
+      "ric": [
+         {
+            "name": "ric1",
+            "baseUrl": "http://ric1:8085/",
+            "managedElementIds": [
+               "kista_1",
+               "kista_2"
+            ]
+         }
+      ]
+   },
+   "streams_subscribes": {
+      "dmaap_subscriber": {
+         "dmaap_info": {
+            "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
+         },
+         "type": "message_router"
+      }
+   }
+}
\ No newline at end of file
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java
index 9814943..213dfa5 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1Client.java
@@ -30,7 +30,7 @@
 public interface A1Client {
 
     public static enum A1ProtocolType {
-        UNKNOWN, STD_V1, OSC_V1
+        UNKNOWN, STD_V1, OSC_V1, CONTROLLER
     }
 
     public Mono<A1ProtocolType> getProtocolVersion();
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java
index cb8f9dd..cd53f83 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/A1ClientFactory.java
@@ -41,13 +41,16 @@
             return Mono.just(createStdA1ClientImpl(ric));
         } else if (version == A1ProtocolType.OSC_V1) {
             return Mono.just(new OscA1Client(ric.getConfig()));
+        } else if (version == A1ProtocolType.CONTROLLER) {
+            return Mono.just(createControllerA1Client(ric));
         }
         return Mono.error(new ServiceException("Not supported protocoltype: " + version));
     }
 
     private Mono<A1Client.A1ProtocolType> getProtocolVersion(Ric ric) {
         if (ric.getProtocolVersion() == A1ProtocolType.UNKNOWN) {
-            return fetchVersion(ric, new OscA1Client(ric.getConfig())) //
+            return fetchVersion(ric, createControllerA1Client(ric)) //
+                .onErrorResume(err -> fetchVersion(ric, new OscA1Client(ric.getConfig())))
                 .onErrorResume(err -> fetchVersion(ric, createStdA1ClientImpl(ric)))
                 .doOnNext(version -> ric.setProtocolVersion(version))
                 .doOnNext(version -> logger.debug("Recover ric: {}, protocol version:{}", ric.name(), version)) //
@@ -61,6 +64,10 @@
         return new StdA1Client(ric.getConfig());
     }
 
+    protected A1Client createControllerA1Client(Ric ric) {
+        return new ControllerA1Client(ric.getConfig());
+    }
+
     private Mono<A1Client.A1ProtocolType> fetchVersion(Ric ric, A1Client a1Client) {
         return Mono.just(a1Client) //
             .flatMap(client -> a1Client.getProtocolVersion());
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java
index e2e5d64..2574217 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/AsyncRestClient.java
@@ -45,6 +45,17 @@
         this.client = WebClient.create(baseUrl);
     }
 
+    public Mono<String> post(String uri, String body) {
+        return client.post() //
+            .uri(uri) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .syncBody(body) //
+            .retrieve() //
+            .onStatus(HttpStatus::isError,
+                response -> Mono.error(new AsyncRestClientException(response.statusCode().toString()))) //
+            .bodyToMono(String.class);
+    }
+
     public Mono<String> put(String uri, String body) {
         logger.debug("PUT uri = '{}''", uri);
         return client.put() //
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/clients/ControllerA1Client.java b/policy-agent/src/main/java/org/oransc/policyagent/clients/ControllerA1Client.java
new file mode 100644
index 0000000..bc7b56d
--- /dev/null
+++ b/policy-agent/src/main/java/org/oransc/policyagent/clients/ControllerA1Client.java
@@ -0,0 +1,160 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.clients;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.oransc.policyagent.configuration.RicConfig;
+import org.oransc.policyagent.repository.Policy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class ControllerA1Client implements A1Client {
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private static final String A1_CONTROLLER_URL =
+        "http://admin:Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U@a1-controller-container:8181/restconf/operations";
+
+    private final RicConfig ricConfig;
+    private final AsyncRestClient restClient;
+
+    public ControllerA1Client(RicConfig ricConfig) {
+        this.ricConfig = ricConfig;
+        this.restClient = new AsyncRestClient(A1_CONTROLLER_URL);
+        logger.debug("ControllerA1Client for ric: {}", this.ricConfig.name());
+    }
+
+    @Override
+    public Mono<List<String>> getPolicyTypeIdentities() {
+        JSONObject paramsJson = new JSONObject();
+        paramsJson.put("near-rt-ric-url", ricConfig.baseUrl());
+        String inputJsonString = createInputJsonString(paramsJson);
+        logger.debug("POST getPolicyTypeIdentities inputJsonString = {}", inputJsonString);
+
+        return restClient.post("/A1-ADAPTER-API:getPolicyTypeIdentities", inputJsonString) //
+            .flatMap(response -> getValueFromResponse(response, "policy-type-id-list")) //
+            .flatMap(this::parseJsonArrayOfString);
+    }
+
+    @Override
+    public Mono<List<String>> getPolicyIdentities() {
+        JSONObject paramsJson = new JSONObject();
+        paramsJson.put("near-rt-ric-url", ricConfig.baseUrl());
+        String inputJsonString = createInputJsonString(paramsJson);
+        logger.debug("POST getPolicyIdentities inputJsonString = {}", inputJsonString);
+
+        return restClient.post("/A1-ADAPTER-API:getPolicyIdentities", inputJsonString) //
+            .flatMap(response -> getValueFromResponse(response, "policy-id-list")) //
+            .flatMap(this::parseJsonArrayOfString);
+    }
+
+    @Override
+    public Mono<String> getPolicyTypeSchema(String policyTypeId) {
+        JSONObject paramsJson = new JSONObject();
+        paramsJson.put("near-rt-ric-url", ricConfig.baseUrl());
+        paramsJson.put("policy-type-id", policyTypeId);
+        String inputJsonString = createInputJsonString(paramsJson);
+        logger.debug("POST getPolicyType inputJsonString = {}", inputJsonString);
+
+        return restClient.post("/A1-ADAPTER-API:getPolicyType", inputJsonString) //
+            .flatMap(response -> getValueFromResponse(response, "policy-type"));
+    }
+
+    @Override
+    public Mono<String> putPolicy(Policy policy) {
+        JSONObject paramsJson = new JSONObject();
+        paramsJson.put("near-rt-ric-url", ricConfig.baseUrl());
+        paramsJson.put("policy-id", policy.id());
+        paramsJson.put("policy", policy.json());
+        String inputJsonString = createInputJsonString(paramsJson);
+        logger.debug("POST putPolicy inputJsonString = {}", inputJsonString);
+
+        return restClient.post("/A1-ADAPTER-API:putPolicy", inputJsonString) //
+            .flatMap(response -> getValueFromResponse(response, "returned-policy"));
+    }
+
+    @Override
+    public Mono<String> deletePolicy(Policy policy) {
+        return deletePolicy(policy.id());
+    }
+
+    @Override
+    public Flux<String> deleteAllPolicies() {
+        return getPolicyIdentities() //
+            .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) // )
+            .flatMap(policyId -> deletePolicy(policyId)); //
+    }
+
+    public Mono<String> deletePolicy(String policyId) {
+        JSONObject paramsJson = new JSONObject();
+        paramsJson.put("near-rt-ric-url", ricConfig.baseUrl());
+        paramsJson.put("policy-id", policyId);
+        String inputJsonString = createInputJsonString(paramsJson);
+        logger.debug("POST deletePolicy inputJsonString = {}", inputJsonString);
+
+        return restClient.post("/A1-ADAPTER-API:deletePolicy", inputJsonString);
+    }
+
+    @Override
+    public Mono<A1ProtocolType> getProtocolVersion() {
+        return getPolicyTypeIdentities() //
+            .flatMap(x -> Mono.just(A1ProtocolType.CONTROLLER));
+    }
+
+    private String createInputJsonString(JSONObject paramsJson) {
+        JSONObject inputJson = new JSONObject();
+        inputJson.put("input", paramsJson);
+        return inputJson.toString();
+    }
+
+    private Mono<String> getValueFromResponse(String response, String key) {
+        logger.debug("A1 client: response = {}", response);
+        try {
+            JSONObject outputJson = new JSONObject(response);
+            JSONObject responseParams = outputJson.getJSONObject("output");
+            String value = responseParams.get(key).toString();
+            return Mono.just(value);
+        } catch (JSONException ex) { // invalid json
+            return Mono.error(ex);
+        }
+    }
+
+    private Mono<List<String>> parseJsonArrayOfString(String inputString) {
+        try {
+            List<String> arrayList = new ArrayList<>();
+            JSONArray jsonArray = new JSONArray(inputString);
+            for (int i = 0; i < jsonArray.length(); i++) {
+                arrayList.add(jsonArray.getString(i));
+            }
+            logger.debug("A1 client: received list = {}", arrayList);
+            return Mono.just(arrayList);
+        } catch (JSONException ex) { // invalid json
+            return Mono.error(ex);
+        }
+    }
+}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
index 9dc41f2..530ac98 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
@@ -25,13 +25,16 @@
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
+
 import javax.validation.constraints.NotNull;
+
 import org.oransc.policyagent.exceptions.ServiceException;
 
 public class ApplicationConfigParser {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
index 42be12d..7e77528 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
@@ -9,14 +9,14 @@
  */
 public interface DmaapMessageConsumer extends Runnable {
 
-	public void init(Properties baseProperties);
+    public void init(Properties baseProperties);
 
-	public abstract void processMsg(String msg) throws Exception;
+    public abstract void processMsg(String msg) throws Exception;
 
-	public boolean isReady();
+    public boolean isReady();
 
-	public boolean isRunning();
+    public boolean isRunning();
 
-	public void stopConsumer();
+    public void stopConsumer();
 
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
index 30444e2..63de197 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
@@ -12,51 +12,51 @@
 
     private final ApplicationConfig applicationConfig;
 
-	protected MRConsumerImpl consumer;
+    protected MRConsumerImpl consumer;
 
-	@Autowired
-	public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
-		this.applicationConfig = applicationConfig;
-	}
+    @Autowired
+    public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
+        this.applicationConfig = applicationConfig;
+    }
 
-	@Override
-	public void run() {
-		// TODO Auto-generated method stub
+    @Override
+    public void run() {
+        // TODO Auto-generated method stub
 
-	}
+    }
 
-	@Override
-	public void init(Properties baseProperties) {
-	    Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
-		// Initialize the DMAAP with the properties
-		// TODO Auto-generated method stub
+    @Override
+    public void init(Properties baseProperties) {
+        Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
+        // Initialize the DMAAP with the properties
+        // TODO Auto-generated method stub
 
-	}
+    }
 
-	@Override
-	public void processMsg(String msg) throws Exception {
-		// Call the Controller once you get the message from DMAAP
-		// Call the concurrent Task executor to handle the incoming request
-		// TODO Auto-generated method stub
+    @Override
+    public void processMsg(String msg) throws Exception {
+        // Call the Controller once you get the message from DMAAP
+        // Call the concurrent Task executor to handle the incoming request
+        // TODO Auto-generated method stub
 
-	}
+    }
 
-	@Override
-	public boolean isReady() {
-		// TODO Auto-generated method stub
-		return false;
-	}
+    @Override
+    public boolean isReady() {
+        // TODO Auto-generated method stub
+        return false;
+    }
 
-	@Override
-	public boolean isRunning() {
-		// TODO Auto-generated method stub
-		return false;
-	}
+    @Override
+    public boolean isRunning() {
+        // TODO Auto-generated method stub
+        return false;
+    }
 
-	@Override
-	public void stopConsumer() {
-		// TODO Auto-generated method stub
+    @Override
+    public void stopConsumer() {
+        // TODO Auto-generated method stub
 
-	}
+    }
 
 }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
index 314c44c..b635cb7 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/utils/MockA1ClientFactory.java
@@ -45,6 +45,11 @@
         return getOrCreateA1Client(ric.name());
     }
 
+    @Override
+    protected A1Client createControllerA1Client(Ric ric) {
+        return getOrCreateA1Client(ric.name());
+    }
+
     public MockA1Client getOrCreateA1Client(String ricName) {
         if (!clients.containsKey(ricName)) {
             logger.debug("Creating client for RIC: {}", ricName);