Add DMaaP publisher configuration to DmaapClient

Change-Id: I59f366bb5045e344e026f8d51e8f8b611b631d73
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml
index 52c69d5..52761cc 100644
--- a/policy-agent/pom.xml
+++ b/policy-agent/pom.xml
@@ -41,6 +41,7 @@
 		<sdk.version>1.1.6</sdk.version>
 		<swagger.version>2.0.0</swagger.version>
 		<json.version>20180130</json.version>
+        <commons-net.version>3.3</commons-net.version>
 		<awaitility.version>4.0.1</awaitility.version>
 		<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
 		<formatter-maven-plugin.version>2.8.1</formatter-maven-plugin.version>
@@ -101,6 +102,11 @@
 			<artifactId>json</artifactId>
 			<version>${json.version}</version>
 		</dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+            <version>${commons-net.version}</version>
+        </dependency>
 		<!--TEST -->
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
@@ -115,6 +121,11 @@
 		<dependency>
 			<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
 			<artifactId>cbs-client</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>dmaap-client</artifactId>
 			<version>${sdk.version}</version>
 		</dependency>
 		<!--REQUIRED TO GENERATE DOCUMENTATION -->
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
index d4f7261..673fe1d 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Vector;
 
 import javax.validation.constraints.NotEmpty;
@@ -41,6 +42,7 @@
 
     private Collection<Observer> observers = new Vector<>();
     private Map<String, RicConfig> ricConfigs = new HashMap<>();
+    private Properties dmaapConsumerConfig;
 
     @Autowired
     public ApplicationConfig() {
@@ -50,6 +52,13 @@
         return this.filepath;
     }
 
+    /*
+     * Do not remove, used by framework!
+     */
+    public synchronized void setFilepath(String filepath) {
+        this.filepath = filepath;
+    }
+
     public synchronized Collection<RicConfig> getRicConfigs() {
         return this.ricConfigs.values();
     }
@@ -63,6 +72,10 @@
         throw new ServiceException("Could not find ric: " + ricName);
     }
 
+    public Properties getDmaapConsumerConfig() {
+        return dmaapConsumerConfig;
+    }
+
     public static enum RicConfigUpdate {
         ADDED, CHANGED, REMOVED
     }
@@ -85,7 +98,7 @@
         }
     }
 
-    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs) {
+    public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapConsumerConfig) {
         Collection<Notification> notifications = new Vector<>();
         synchronized (this) {
             Map<String, RicConfig> newRicConfigs = new HashMap<>();
@@ -109,6 +122,8 @@
             this.ricConfigs = newRicConfigs;
         }
         notifyObservers(notifications);
+
+        this.dmaapConsumerConfig = dmaapConsumerConfig;
     }
 
     private void notifyObservers(Collection<Notification> notifications) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
index 75ee135..9dc41f2 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
@@ -25,32 +25,44 @@
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
 import java.util.Vector;
-
+import javax.validation.constraints.NotNull;
 import org.oransc.policyagent.exceptions.ServiceException;
 
 public class ApplicationConfigParser {
 
     private static final String CONFIG = "config";
+
     private static Gson gson = new GsonBuilder() //
         .serializeNulls() //
         .create(); //
 
     private Vector<RicConfig> ricConfig;
+    private Properties dmaapConsumerConfig;
 
     public ApplicationConfigParser() {
     }
 
     public void parse(JsonObject root) throws ServiceException {
-        JsonObject config = root.getAsJsonObject(CONFIG);
-        ricConfig = parseRics(config);
+        JsonObject ricConfigJson = root.getAsJsonObject(CONFIG);
+        ricConfig = parseRics(ricConfigJson);
+        JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes");
+        dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson);
     }
 
     public Vector<RicConfig> getRicConfigs() {
         return this.ricConfig;
     }
 
+    public Properties getDmaapConsumerConfig() {
+        return dmaapConsumerConfig;
+    }
+
     private Vector<RicConfig> parseRics(JsonObject config) throws ServiceException {
         Vector<RicConfig> result = new Vector<RicConfig>();
         for (JsonElement ricElem : getAsJsonArray(config, "ric")) {
@@ -71,4 +83,69 @@
         return get(obj, memberName).getAsJsonArray();
     }
 
+    private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
+        Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
+        if (topics.size() != 1) {
+            throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
+        }
+        JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
+        JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
+        String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+        Properties dmaapProps = new Properties();
+        try {
+            URL url = new URL(topicUrl);
+            String passwd = "";
+            String userName = "";
+            if (url.getUserInfo() != null) {
+                String[] userInfo = url.getUserInfo().split(":");
+                userName = userInfo[0];
+                passwd = userInfo[1];
+            }
+            String urlPath = url.getPath();
+            DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+
+            dmaapProps.put("port", url.getPort());
+            dmaapProps.put("server", url.getHost());
+            dmaapProps.put("topic", path.dmaapTopicName);
+            dmaapProps.put("consumerGroup", path.consumerGroup);
+            dmaapProps.put("consumerInstance", path.consumerId);
+            dmaapProps.put("fetchTimeout", 15000);
+            dmaapProps.put("fetchLimit", 1000);
+            dmaapProps.put("userName", userName);
+            dmaapProps.put("password", passwd);
+        } catch (MalformedURLException e) {
+            throw new ServiceException("Could not parse the URL", e);
+        }
+
+        return dmaapProps;
+    }
+
+    private static @NotNull String getAsString(JsonObject obj, String memberName) throws ServiceException {
+        return get(obj, memberName).getAsString();
+    }
+
+    private class DmaapConsumerUrlPath {
+        final String dmaapTopicName;
+        final String consumerGroup;
+        final String consumerId;
+
+        DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+            this.dmaapTopicName = dmaapTopicName;
+            this.consumerGroup = consumerGroup;
+            this.consumerId = consumerId;
+        }
+    }
+
+    private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
+        String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
+        if (tokens.length != 5) {
+            throw new ServiceException("The path has incorrect syntax: " + urlPath);
+        }
+
+        final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
+        final String consumerGroup = tokens[3]; // users
+        final String consumerId = tokens[4]; // sdnc1
+        return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+    }
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java
deleted file mode 100644
index 47ab592..0000000
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.oransc.policyagent.dmaap;
-
-import org.springframework.context.annotation.Configuration;
-
-@Configuration("dmaap")
-public class BusTopicParams {
-
-	private int port;
-	private String server;
-	private String topic;
-	private String consumerGroup;
-	private String consumerInstance;
-	private int fetchTimeout;
-	private int fetchLimit;
-	private String userName;
-	private String password;
-}
\ No newline at end of file
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
index b64a822..30444e2 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
@@ -3,13 +3,20 @@
 import java.util.Properties;
 
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
+@Component
 public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
 
+    private final ApplicationConfig applicationConfig;
+
 	protected MRConsumerImpl consumer;
-	
-	public DmaapMessageConsumerImpl() {
-		// TODO Auto-generated constructor stub
+
+	@Autowired
+	public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
+		this.applicationConfig = applicationConfig;
 	}
 
 	@Override
@@ -20,6 +27,7 @@
 
 	@Override
 	public void init(Properties baseProperties) {
+	    Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
 		// Initialize the DMAAP with the properties
 		// TODO Auto-generated method stub
 
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
index 9a8dc34..bc43eda 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
@@ -125,7 +125,7 @@
         try {
             ApplicationConfigParser parser = new ApplicationConfigParser();
             parser.parse(jsonObject);
-            this.appConfig.setConfiguration(parser.getRicConfigs());
+            this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig());
         } catch (ServiceException e) {
             logger.error("Could not parse configuration {}", e.toString(), e);
         }
@@ -152,7 +152,7 @@
             }
             ApplicationConfigParser appParser = new ApplicationConfigParser();
             appParser.parse(rootObject);
-            appConfig.setConfiguration(appParser.getRicConfigs());
+            appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig());
             logger.info("Local configuration file loaded: {}", filepath);
         } catch (JsonSyntaxException | ServiceException | IOException e) {
             logger.trace("Local configuration file not loaded: {}", filepath, e);
diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json
index 446c061..c63b710 100644
--- a/policy-agent/src/test/resources/test_application_configuration.json
+++ b/policy-agent/src/test/resources/test_application_configuration.json
@@ -19,5 +19,13 @@
             ]
          }
       ]
+   },
+   "streams_subscribes": {
+      "dmaap_subscriber": {
+         "dmaap_info": {
+            "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
+         },
+         "type": "message_router"
+      }
    }
 }
\ No newline at end of file